douping5015 2018-03-27 03:44
浏览 208
已采纳

Gorilla Websocket没有收到消息

I am building a gateway in Golang that communicates with Gorilla websockets.

I am running it on Ubuntu 16.04, and testing it currently with a .NET console app.

Using Wireshark on Windows and sniffit on Ubuntu have determined that messages are being sent correctly from the Windows client and received by the Ubuntu box.

In my code, however, sometimes after a few successful messages, and sometimes after none, my gateway is failing to read the message (still sitting at _, msg, errCon := conn.ReadMessage())

An example of the output will be:

2018/03/27 02:38:06 Awaiting Message ... 2018/03/27 02:38:07 Message received: main.AdminRequest{Data:"{\"SomeDataHeader\":\"SomeData\"}", Requestor:"user", Type:"JustDoSomethingRequest", Ukey:"talca"} 2018/03/27 02:38:07 {"SomeDataHeader":"SomeData"} 2018/03/27 02:38:07 Awaiting Message ...

As I have previously said, it may receive a few messages like this, but, despite network traffic on both ends continuing, no more messages will be received

I am fairly new with Golang, and working under the assumption that I am missing something.

I have trimmed out error handling and the like for the sake of brevity in the code below, but this is an example of the failing code.

EDIT As requested I have added Golang full code, and C# client code below (although, as I stated, Wireshark and sniffit have determined that data is going over the wire)

package main

import (
    "fmt"
    "net/http"
    "github.com/gorilla/websocket"
    "encoding/json"
    "log"
)

var upgrader = websocket.Upgrader{ ReadBufferSize:  1024, WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type AdminRequest struct {
        Data      string `json:"Data"`
        Requestor string `json:"Requestor"`
        Type      string `json:"Type"`
        Ukey      string `json:"Ukey"`
    } 

func main() {

    http.HandleFunc("/a", func(w http.ResponseWriter, r *http.Request) {
        var conn, _ = upgrader.Upgrade(w, r, nil)

        go func(conn *websocket.Conn) {
            for {
                _, _, err := conn.ReadMessage()
                if err != nil {         
                    log.Println("Close: "+ err.Error())
                    conn.Close()
                    return
                }
            }
        }(conn)


        go func(conn *websocket.Conn) {
            for {

                log.Println("Awaiting Message ...")
                _, msg, errCon := conn.ReadMessage()

                if errCon != nil {
                    log.Println("Read Error:", errCon)
                    break
                }

                log.Println("Message received: ")

                var r AdminRequest

                if err := json.Unmarshal(msg, &r); err != nil {

                    log.Println("Error: " + err.Error());
                    return;
                }

                fmt.Printf("%#v
", r)
                log.Println(r.Data);
            }           
        }(conn)

    })

    http.ListenAndServe(":3000", nil)
}

C# Code:

public class Client : IDisposable
    {
        private ClientWebSocket _socket;


        string _address;
        int _port;
        public Client(string address)
        {
            _address = address;

            _socket = new ClientWebSocket();
        }

        public async void SetupForReceivingStuffs()
        {
            while (_socket.State == WebSocketState.Open)
            {
                ArraySegment<byte> receivedBytes = new ArraySegment<byte>(new byte[1024]);
                WebSocketReceiveResult result = await _socket.ReceiveAsync(receivedBytes, CancellationToken.None);

                Console.WriteLine(Encoding.UTF8.GetString(receivedBytes.Array, 0, result.Count));
            }
        }

        public async void SetupForSendingStuffs(ConcurrentQueue<AdminRequest> queue)
        {
            while (_socket.State == WebSocketState.Open)
            {
                AdminRequest next;

                while (queue.Count > 0)
                {
                    if (queue.TryDequeue(out next))
                    {
                        await Send(next);
                    }
                }

                await Task.Yield();
            }
        }

        public async Task Connect()
        {
            while (_socket.State != WebSocketState.Open)
            {
                try
                {
                    _socket = new ClientWebSocket();
                    await _socket.ConnectAsync(new Uri(_address), CancellationToken.None);

                    Console.WriteLine("Socket state: " + _socket.State);
                }
                catch (Exception ex)
                {
                    //Not getting hit
                    Console.WriteLine(ex.Message);
                    Console.WriteLine(ex.StackTrace);
                }
            }
        }



        public Task Send<TData>(TData data)
        {
            string text = JsonConvert.SerializeObject(data);

            var encoded = Encoding.UTF8.GetBytes(text);
            var buffer = new ArraySegment<Byte>(encoded, 0, encoded.Length);

            return _socket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
        }


        public void Dispose()
        {
            _socket.Dispose();
        }
    }

Called by:

class Program
{
    static ConcurrentQueue<AdminRequest> _toSend;

    static void Main(string[] args)
    {
        _toSend = new ConcurrentQueue<AdminRequest>();

        Client client = new Client("ws:/(myip):(myport)/a");
        client.Connect().Wait();

        //client.SetupForReceivingStuffs();
        client.SetupForSendingStuffs(_toSend);

        WriteInstructions();

        LoopAuto();

        Console.WriteLine("Bye");
    }

    private static void LoopAuto()
    {
        DateTime nextMessage = DateTime.Now;

        while (true)
        {

            if (DateTime.Now < nextMessage) continue;
            Console.WriteLine("Next");
            nextMessage = DateTime.Now.AddSeconds(2);

            _toSend.Enqueue(new AdminRequest
            {
                Data = "{\"SomeDataHeader\":\"SomeData\"}",
                Requestor = "user",
                Type = "JustDoSomethingRequest",
                Ukey = "talca"
            });
        }
    }

    private static ConsoleKeyInfo LoopManual()
    {
        ConsoleKeyInfo info;
        do
        {
            info = Console.ReadKey(true);

            if (info.KeyChar == '1')
            {
                _toSend.Enqueue(new AdminRequest
                {
                    Data = "{\"SomeDataHeader\":\"SomeData\"}",
                    Requestor = "user",
                    Type = "JustDoSomethingRequest",
                    Ukey = "talca"
                });
            }
            else if (info.KeyChar == 'i')
            {
                WriteInstructions();
            }

        } while (info.KeyChar != 'x');

        return info;
    }

    private static void WriteInstructions()
    {
        Console.WriteLine("1. Send New Message");
        Console.WriteLine("i. Instructions (these lines)");
        Console.WriteLine("x: Exit");
    }
}
  • 写回答

1条回答

  • dongzai2952 2018-03-27 23:34
    关注

    The application runs two goroutines that read messages in a loop. The first does nothing with the received message. The second parses and logs the message. You do not see any output because the first goroutine is receiving the messages.

    The first goroutine does not seem to serve any purpose. Delete the first goroutine to fix the problem.

    Deleting the first goroutine also fixes a data race. Concurrent reads on a websocket connection are not supported. The race detector will report this issue.

    Here's the updated code with other fixes and improvements. The net/http server calls handlers a per-connection goroutine. Use tha goroutine instead of starting yet another goroutine. Use the websocket package's JSON helper method.

    package main
    
    import (
        "fmt"
        "net/http"
        "github.com/gorilla/websocket"
        "encoding/json"
        "log"
    )
    
    var upgrader = websocket.Upgrader{ ReadBufferSize:  1024, WriteBufferSize: 1024,
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
    
    type AdminRequest struct {
            Data      string `json:"Data"`
            Requestor string `json:"Requestor"`
            Type      string `json:"Type"`
            Ukey      string `json:"Ukey"`
        } 
    
    func wsHandler(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            log.Println(err)
            return
        }
        defer conn.Close()
        for {
            var r AdminRequest
            if err := conn.ReadJSON(&r); err != nil {
                log.Println(err)
                break
            }
            fmt.Printf("%#v
    ", r)
            log.Println(r.Data);
        }           
    }
    
    func main() {
        http.HandleFunc("/a", wsHandler)
        http.ListenAndServe(":3000", nil)
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 无线电能传输系统MATLAB仿真问题
  • ¥50 如何用脚本实现输入法的热键设置
  • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
  • ¥30 深度学习,前后端连接
  • ¥15 孟德尔随机化结果不一致
  • ¥15 apm2.8飞控罗盘bad health,加速度计校准失败
  • ¥15 求解O-S方程的特征值问题给出边界层布拉休斯平行流的中性曲线
  • ¥15 谁有desed数据集呀
  • ¥20 手写数字识别运行c仿真时,程序报错错误代码sim211-100
  • ¥15 关于#hadoop#的问题