type WsChat struct{
LogTag string
}
func (ws *WsChat) ChatMsgKafkaRead(){
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: types.Config.Kafka.Addr,
Topic: types.Config.Kafka.ChatTopic,
GroupID: types.Config.Kafka.ChatGroupID,
})
defer r.Close()
for {
// 读取消息
read_msg, err := r.ReadMessage(ctx)
if err != nil{
logx.Error(fmt.Sprintf("tags:%+v,kafka消费异常.err:%+v", ws.LogTag,err))
break
}
}
//main.go文件
package main
import (
"x_dream_guild/controller"
"x_dream_guild/types"
"x_dream_guild/pkg"
"net/http"
"github.com/zeromicro/go-zero/rest"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/logx"
)
func main(){
srv, err := rest.NewServer(
rest.RestConf{
Port: types.Config.Port,
ServiceConf: service.ServiceConf{
Log: logx.LogConf{
Path: "./logs",
Compress:true,
ServiceName:"x_dream",
Mode:"console",
//Mode:"file",
Stat:false,
Rotation:"daily",
},
},
},
rest.WithNotFoundHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
w.Write([]byte(`{"code": 404, "msg": "请求的路由不存在", "data": null}"`))
})),
rest.WithNotAllowedHandler(tools.NewCorsMiddleware().Handler()),
)
srv.Use(tools.NewCorsMiddleware().Handle)
if err != nil {
logx.Error("服务初始化失败,err:%+v", err)
panic(err)
}
srv.AddRoutes(controller.Router())
defer func(){
srv.Stop()
err := types.CloseDB()
if err != nil{
logx.Error("关闭数据库连接失败,err:%+v", err)
}
}()
srv.Start()
}
现在的main.go是处理web请求的,我想在main.go文件里调用ChatMsgKafkaRead函数,但不要影响正常的web请求处理,应该怎么加
golang kafka消费放入main函数启动
- 写回答
- 好问题 0 提建议
- 关注问题
- 邀请回答
-
2条回答 默认 最新
关注让【宝码香车】来帮您解答,本回答参考DeepSeek编写,并整理提供。
如果答案让您满意,请采纳、点赞、关注,非常感谢!
.
每日一句正能量,让它带给您好心情:用心做好每一件小事,就是在为大事铺路。
.在 Go 语言中,要在不阻塞 Web 服务的情况下启动 Kafka 消费者,可以使用 goroutine 来并发执行。以下是修改后的
main.go文件:package main import ( "x_dream_guild/controller" "x_dream_guild/types" "x_dream_guild/pkg" "net/http" "github.com/zeromicro/go-zero/rest" "github.com/zeromicro/go-zero/core/service" "github.com/zeromicro/go-zero/core/logx" ) func main() { srv, err := rest.NewServer( rest.RestConf{ Port: types.Config.Port, ServiceConf: service.ServiceConf{ Log: logx.LogConf{ Path: "./logs", Compress: true, ServiceName: "x_dream", Mode: "console", //Mode: "file", Stat: false, Rotation: "daily", }, }, }, rest.WithNotFoundHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusNotFound) w.Write([]byte(`{"code": 404, "msg": "请求的路由不存在", "data": null}"`)) })), rest.WithNotAllowedHandler(tools.NewCorsMiddleware().Handler()), ) srv.Use(tools.NewCorsMiddleware().Handle) if err != nil { logx.Error("服务初始化失败,err:%+v", err) panic(err) } srv.AddRoutes(controller.Router()) // 启动 Kafka 消费者 go startKafkaConsumer() defer func() { srv.Stop() err := types.CloseDB() if err != nil { logx.Error("关闭数据库连接失败,err:%+v", err) } }() srv.Start() } // 启动 Kafka 消费者 func startKafkaConsumer() { ws := &WsChat{ LogTag: "kafka_consumer", } ws.ChatMsgKafkaRead() }关键修改说明:
- 使用 goroutine:通过
go startKafkaConsumer()在单独的 goroutine 中启动 Kafka 消费者,这样不会阻塞 Web 服务器的启动和运行。
- 创建独立的启动函数:
startKafkaConsumer()函数负责初始化WsChat结构体并调用消费方法。
注意事项:
- 错误处理:由于 Kafka 消费者运行在独立的 goroutine 中,需要确保其异常不会导致整个程序崩溃。你已经在
ChatMsgKafkaRead方法中有错误日志记录,这是正确的做法。
- 上下文管理:在你的
ChatMsgKafkaRead方法中使用了ctx,需要确保这个上下文在适当的时候被取消,以便优雅地关闭 Kafka 消费者。你可能需要修改该方法以接收上下文参数:
func (ws *WsChat) ChatMsgKafkaRead(ctx context.Context) { // ... 现有代码 }然后在
startKafkaConsumer中传递上下文:func startKafkaConsumer() { ctx := context.Background() ws := &WsChat{ LogTag: "kafka_consumer", } ws.ChatMsgKafkaRead(ctx) }- 资源清理:如果需要更精细的控制,可以考虑在程序退出时优雅地关闭 Kafka 消费者,但这需要修改你的 Kafka 读取器实现以支持上下文取消。
这样修改后,你的 Web 服务和 Kafka 消费者将并发运行,互不影响。
本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报- 使用 goroutine:通过