dragon7088 2019-09-10 03:45
浏览 194

如何使用Jaeger追踪Kafka生产者和消费者

I want to tracing kafka producer and consumer by using jaeger and Go, but I can not get the chain of the span for the producer's tracer and consumer's tracer. Here is the producer's code:

func (p *Producer) WriteMessages(ctx context.Context, messages ...kafka.Message) error {
    topic := p.Stats().Topic
    for _, message := range messages {
        // span := opentracing.GlobalTracer().StartSpan("TO_"+topic, ext.SpanKindProducer)
        span, _ := opentracing.StartSpanFromContext(ctx, "TO_"+topic, ext.SpanKindProducer)
        ext.Component.Set(span, "golang-kafka")
        ext.PeerService.Set(span, "kafka")
        ext.MessageBusDestination.Set(span, topic)
        headers := make(map[string]string)
        opentracing.GlobalTracer().Inject(
            span.Context(),
            opentracing.TextMap,
            opentracing.TextMapCarrier(headers),
        )
        for key, value := range headers {
            header := kafka.Header{
                Key:   key,
                Value: []byte(value),
            }
            message.Headers = append(message.Headers, header)
        }
        span.Finish()
    }

    return p.Writer.WriteMessages(ctx, messages...)
}

and the consumer's code:

func (c *Consumer) ReadMessage(ctx context.Context) (kafka.Message, error) {
    message, err := c.Reader.ReadMessage(ctx)
    if err != nil {
        return kafka.Message{}, err
    }
    topic, partition, offset := message.Topic, message.Partition, message.Offset
    headers := make(map[string]string)
    for _, header := range message.Headers {
        headers[header.Key] = string(header.Value)
    }
    spanContext, _ := opentracing.GlobalTracer().Extract(
        opentracing.TextMap,
        opentracing.TextMapCarrier(headers),
    )
    span, _ := opentracing.StartSpanFromContext(ctx, "FROM_"+topic, opentracing.FollowsFrom(spanContext), ext.SpanKindConsumer)
    // span := opentracing.StartSpan("FROM_"+topic, opentracing.FollowsFrom(spanContext), ext.SpanKindConsumer)
    ext.Component.Set(span, "golang-kafka")
    ext.PeerService.Set(span, "kafka")
    span.SetTag("topic", topic)
    span.SetTag("partition", partition)
    span.SetTag("offset", offset)
    span.Finish()

    return message, err
}

I can get the producer chain and the consumer chain, but I can not chain them together.

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥20 关于#硬件工程#的问题,请各位专家解答!
    • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
    • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
    • ¥30 截图中的mathematics程序转换成matlab
    • ¥15 动力学代码报错,维度不匹配
    • ¥15 Power query添加列问题
    • ¥50 Kubernetes&Fission&Eleasticsearch
    • ¥15 報錯:Person is not mapped,如何解決?
    • ¥15 c++头文件不能识别CDialog
    • ¥15 Excel发现不可读取的内容