showliuzp 2025-10-23 21:26 采纳率: 83.8%
浏览 2
已结题

golang kafka消费放入main函数启动


type WsChat struct{
    LogTag string
}
func (ws *WsChat) ChatMsgKafkaRead(){
    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: types.Config.Kafka.Addr,
        Topic:   types.Config.Kafka.ChatTopic,
        GroupID: types.Config.Kafka.ChatGroupID,
    })

    defer r.Close()

    for {
        // 读取消息
        read_msg, err := r.ReadMessage(ctx)
        if err != nil{
            logx.Error(fmt.Sprintf("tags:%+v,kafka消费异常.err:%+v", ws.LogTag,err))
            break
        }

  
}

//main.go文件
package main

import (
    "x_dream_guild/controller"
    "x_dream_guild/types"
    "x_dream_guild/pkg"
    "net/http"
    "github.com/zeromicro/go-zero/rest"
    "github.com/zeromicro/go-zero/core/service"
    "github.com/zeromicro/go-zero/core/logx"
)

func main(){
    srv, err := rest.NewServer(
        rest.RestConf{
            Port: types.Config.Port,
            ServiceConf: service.ServiceConf{
                Log: logx.LogConf{
                    Path: "./logs",
                    Compress:true,
                    ServiceName:"x_dream",
                    Mode:"console",
                    //Mode:"file",
                    Stat:false,
                    Rotation:"daily",
                },
            },
        },
        rest.WithNotFoundHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            w.Header().Set("Content-Type", "application/json")
            w.WriteHeader(http.StatusNotFound)
            w.Write([]byte(`{"code": 404, "msg": "请求的路由不存在", "data": null}"`))
        })),
        rest.WithNotAllowedHandler(tools.NewCorsMiddleware().Handler()),
    )

    srv.Use(tools.NewCorsMiddleware().Handle)

    if err != nil {
        logx.Error("服务初始化失败,err:%+v", err)
        panic(err)
    }

    srv.AddRoutes(controller.Router())

    defer func(){
        srv.Stop()

        err := types.CloseDB()
        if err != nil{
            logx.Error("关闭数据库连接失败,err:%+v", err)
        }
    }()

    srv.Start()
}


现在的main.go是处理web请求的,我想在main.go文件里调用ChatMsgKafkaRead函数,但不要影响正常的web请求处理,应该怎么加
  • 写回答

2条回答 默认 最新

  • 宝码香车 优质创作者: JavaScript技术领域 2025-10-23 21:31
    关注

    让【宝码香车】来帮您解答,本回答参考DeepSeek编写,并整理提供。

    如果答案让您满意,请采纳、点赞、关注,非常感谢!
    .
    每日一句正能量,让它带给您好心情:用心做好每一件小事,就是在为大事铺路。
    .

    在 Go 语言中,要在不阻塞 Web 服务的情况下启动 Kafka 消费者,可以使用 goroutine 来并发执行。以下是修改后的 main.go 文件:

    package main
    
    
    import (
        "x_dream_guild/controller"
        "x_dream_guild/types"
        "x_dream_guild/pkg"
        "net/http"
        "github.com/zeromicro/go-zero/rest"
        "github.com/zeromicro/go-zero/core/service"
        "github.com/zeromicro/go-zero/core/logx"
    )
    
    
    func main() {
        srv, err := rest.NewServer(
            rest.RestConf{
                Port: types.Config.Port,
                ServiceConf: service.ServiceConf{
                    Log: logx.LogConf{
                        Path: "./logs",
                        Compress: true,
                        ServiceName: "x_dream",
                        Mode: "console",
                        //Mode: "file",
                        Stat: false,
                        Rotation: "daily",
                    },
                },
            },
            rest.WithNotFoundHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                w.Header().Set("Content-Type", "application/json")
                w.WriteHeader(http.StatusNotFound)
                w.Write([]byte(`{"code": 404, "msg": "请求的路由不存在", "data": null}"`))
            })),
            rest.WithNotAllowedHandler(tools.NewCorsMiddleware().Handler()),
        )
    
    
        srv.Use(tools.NewCorsMiddleware().Handle)
    
    
        if err != nil {
            logx.Error("服务初始化失败,err:%+v", err)
            panic(err)
        }
    
    
        srv.AddRoutes(controller.Router())
    
    
        // 启动 Kafka 消费者
        go startKafkaConsumer()
    
    
        defer func() {
            srv.Stop()
    
    
            err := types.CloseDB()
            if err != nil {
                logx.Error("关闭数据库连接失败,err:%+v", err)
            }
        }()
    
    
        srv.Start()
    }
    
    
    // 启动 Kafka 消费者
    func startKafkaConsumer() {
        ws := &WsChat{
            LogTag: "kafka_consumer",
        }
        ws.ChatMsgKafkaRead()
    }
    

    关键修改说明:

    1. 使用 goroutine:通过 go startKafkaConsumer() 在单独的 goroutine 中启动 Kafka 消费者,这样不会阻塞 Web 服务器的启动和运行。
    1. 创建独立的启动函数startKafkaConsumer() 函数负责初始化 WsChat 结构体并调用消费方法。

    注意事项:

    1. 错误处理:由于 Kafka 消费者运行在独立的 goroutine 中,需要确保其异常不会导致整个程序崩溃。你已经在 ChatMsgKafkaRead 方法中有错误日志记录,这是正确的做法。
    1. 上下文管理:在你的 ChatMsgKafkaRead 方法中使用了 ctx,需要确保这个上下文在适当的时候被取消,以便优雅地关闭 Kafka 消费者。你可能需要修改该方法以接收上下文参数:
    func (ws *WsChat) ChatMsgKafkaRead(ctx context.Context) {
        // ... 现有代码
    }
    

    然后在 startKafkaConsumer 中传递上下文:

    func startKafkaConsumer() {
        ctx := context.Background()
        ws := &WsChat{
            LogTag: "kafka_consumer",
        }
        ws.ChatMsgKafkaRead(ctx)
    }
    
    1. 资源清理:如果需要更精细的控制,可以考虑在程序退出时优雅地关闭 Kafka 消费者,但这需要修改你的 Kafka 读取器实现以支持上下文取消。

    这样修改后,你的 Web 服务和 Kafka 消费者将并发运行,互不影响。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 10月31日
  • 已采纳回答 10月23日
  • 创建了问题 10月23日