showliuzp 2025-10-18 21:59 采纳率: 84.3%
浏览 8

golang多协程读取变量一致性解决方案


//websocket基础定义
package tools

import (
    "github.com/google/uuid"
    "github.com/gorilla/websocket"
    "sync"
    "time"
    "sync/atomic"
)

//websocket连接
type WebSocketConn struct{
    ClientId        *string             //唯一标识一条连接
    Conn            *websocket.Conn     //主播对应的conn连接
    CreateAt        int64               //连接创建时间
    LatHeartbeatAt  int64               //心跳最后回复时间
    sync.Mutex
}

type WebSocketConnMap struct{
    ConnCount   int32                       //有效连接数
    ConnMap     map[*string]*WebSocketConn  //*string对应uuid
    sync.Mutex
}

var (
    WebSocketConnOnce       *WebSocketConn
    WebSocketConnMapOnce    *WebSocketConnMap
    )

func NewWebSocketConn() *WebSocketConn{
    var once sync.Once

    once.Do(func(){
        WebSocketConnOnce = new(WebSocketConn)
    })

    return WebSocketConnOnce
}

func NewWebSocketConnMap() *WebSocketConnMap{
    var once sync.Once

    once.Do(func(){
        WebSocketConnMapOnce         = new(WebSocketConnMap)
        WebSocketConnMapOnce.ConnMap = make(map[*string]*WebSocketConn,0)
    })

    return WebSocketConnMapOnce
}

//初始化
func init(){
    NewWebSocketConnMap()
    NewWebSocketConn()
}


//调用:
//缓存池
var ws_buff_pool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

//websocket缓冲区、跨域设置
var upgrader = websocket.Upgrader{
    ReadBufferSize  : ws_buff,
    WriteBufferSize : ws_buff,
    CheckOrigin     : chk_origin,       //设置跨域
    WriteBufferPool : &ws_buff_pool,    //设置缓存池
}

var ws_conn_obj = tools.WebSocketConnOnce
//创建一个新的websocket连接
func (ws *WsChat) WsChatCreateConn(user_info *models.LuUserInfo,w http.ResponseWriter, r *http.Request) (conn *websocket.Conn,err error){
    conn, err = upgrader.Upgrade(w, r, nil)
    if err != nil{
        return
    }

    var (
        exit_chan = make(chan types.Signal)
    )

    //连接建立成功,则下发唯一id,以后使用该id通信
    client_id := ws_conn_obj.CreateWebSocketConn(conn)

    //建立账号和连接的绑定关系,并作为链表的一个节点
    item := chat_websocket_conn{
        user_conn       : tools.WebSocketConnMapOnce.ConnMap[client_id],
        client_id       : *client_id,
        user_type       : (*user_info).Flag,
        user_id         : (*user_info).Id,
        user_nickname   : (*user_info).NickName,
    }

    linked_node := tools.SingleNode{Data:item}
    linked_list.LastAppend(&linked_node)

    bytes := pack_msg((*user_info).Id,types.INT64_ZERO,types.INT64_ZERO,(*user_info).NickName,types.STRING_ZERO,types.STRING_ZERO,types.STRING_ZERO,types.INT64_ZERO,types.WS_SYNC_SUCC,client_id)
    ws_conn_obj.WriteMsg(client_id, &bytes)

    //读取一个连接的数据
    go ws.WsChatMsgRead(client_id,item.user_conn.Conn)

    //写入一个连接的数据
    go ws.WsChatMsgWrite(client_id,item.user_conn.Conn)

    <-exit_chan

    return
}

WsChatMsgRead:
//读取客户端发送过来的消息
func (ws *WsChat) WsChatMsgRead(client_id *string,conn *websocket.Conn){
    defer func(){
        //从链表中删除每个主播的基础数据
        linked_list.DelLinkedNode(delete_linked_node_find(client_id))

        //从websocket连接map中删除该连接,并将连接总数减一
        tools.WebSocketConnMapOnce.CloseWebSocketMap(client_id)

        linked_list.Display()
    }()

    //设置读取大小,超过该值,连接关闭并返回1009错误码
    conn.SetReadLimit(ws_read_size_limit)

    //设置读取超时
    SetReadDeadline(ws_read_timeout_limit)

    for {
        msg_type, msg, err := conn.ReadMessage()
        if err != nil{
            if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
                logx.Error(fmt.Sprintf("tags:%+v,连接正常关闭",ws.LogTag))
            }else{
                logx.Error(fmt.Sprintf("tags:%+v,连接非正常关闭,err:%+v",ws.LogTag,err))
            }


            break
        }

        if msg_type != websocket.TextMessage{
            continue
        }

        fmt.Printf("收到.消息类型:%+v,消息内容:%+v,err:%+v\n",msg_type, string(msg), err)
    }

    return
}

WsChatMsgWrite:
//写入数据到客户端
func (ws *WsChat) WsChatMsgWrite(client_id *string,conn *websocket.Conn){
    conn.SetWriteDeadline(ws_write_timeout_limit)

    content : = []bytes("收到消息,谢谢")
    //限制读取500K
    if err := tools.WebSocketConnOnce.WriteMsg(content); err != nil {
        fmt.Printf("写入消息错误\n")
    }

    return
}


//WsChatMsgWrite、WsChatMsgRead都需要读取websocket.Conn里的Conn连接,但多协程处理的过程中,A协程改变了这个conn,B协程不一定能感知到,也就是内存模型的问题,atomic.Store是否可以解决这个问题呢
  • 写回答

4条回答 默认 最新

  • 阿里嘎多学长 2025-10-18 21:59
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    解决方案

    在 Go 语言中,多协程读取变量一致性是一个常见的问题。以下是一些解决方案:

    1. 使用 Mutex

    你可以使用 sync.Mutex 来保护共享变量,确保只有一个协程可以访问它。

    import (
        "sync"
    )
    
    var mu sync.Mutex
    var sharedVar int
    
    func worker() {
        mu.Lock()
        defer mu.Unlock()
        // 读取或修改 sharedVar
    }
    
    func main() {
        go worker()
        // ...
    }
    

    2. 使用 RWMutex

    如果你只需要读取共享变量,而不需要修改它,可以使用 sync.RWMutex。这可以提高读取的性能,因为读取不需要锁定整个变量。

    import (
        "sync"
    )
    
    var mu sync.RWMutex
    var sharedVar int
    
    func worker() {
        mu.RLock()
        defer mu.RUnlock()
        // 读取 sharedVar
    }
    
    func main() {
        go worker()
        // ...
    }
    

    3. 使用 channel

    你可以使用 channel 来传递共享变量的值,避免共享变量本身。

    ch := make(chan int)
    
    func worker() {
        // 读取或修改 sharedVar
        ch <- sharedVar
    }
    
    func main() {
        go worker()
        // ...
        sharedVar = <-ch
    }
    

    4. 使用 atomic

    如果你只需要读取或修改共享变量的某个字段,可以使用 sync/atomic 包中的函数。

    import (
        "sync/atomic"
    )
    
    var sharedVar int32
    
    func worker() {
        atomic.StoreInt32(&sharedVar, 1)
    }
    
    func main() {
        go worker()
        // ...
        value := atomic.LoadInt32(&sharedVar)
    }
    

    5. 使用 goroutine-safe 数据结构

    你可以使用 goroutine-safe 的数据结构,如 sync.Mapsync.Pool,来避免共享变量本身。

    import (
        "sync"
    )
    
    var m sync.Map
    
    func worker() {
        m.Store("key", "value")
    }
    
    func main() {
        go worker()
        // ...
        value, _ := m.Load("key")
    }
    

    这些解决方案可以帮助你解决多协程读取变量一致性的问题。

    评论

报告相同问题?

问题事件

  • 创建了问题 10月18日