douping5015 2018-03-27 03:44
浏览 208
已采纳

Gorilla Websocket没有收到消息

I am building a gateway in Golang that communicates with Gorilla websockets.

I am running it on Ubuntu 16.04, and testing it currently with a .NET console app.

Using Wireshark on Windows and sniffit on Ubuntu have determined that messages are being sent correctly from the Windows client and received by the Ubuntu box.

In my code, however, sometimes after a few successful messages, and sometimes after none, my gateway is failing to read the message (still sitting at _, msg, errCon := conn.ReadMessage())

An example of the output will be:

2018/03/27 02:38:06 Awaiting Message ... 2018/03/27 02:38:07 Message received: main.AdminRequest{Data:"{\"SomeDataHeader\":\"SomeData\"}", Requestor:"user", Type:"JustDoSomethingRequest", Ukey:"talca"} 2018/03/27 02:38:07 {"SomeDataHeader":"SomeData"} 2018/03/27 02:38:07 Awaiting Message ...

As I have previously said, it may receive a few messages like this, but, despite network traffic on both ends continuing, no more messages will be received

I am fairly new with Golang, and working under the assumption that I am missing something.

I have trimmed out error handling and the like for the sake of brevity in the code below, but this is an example of the failing code.

EDIT As requested I have added Golang full code, and C# client code below (although, as I stated, Wireshark and sniffit have determined that data is going over the wire)

package main

import (
    "fmt"
    "net/http"
    "github.com/gorilla/websocket"
    "encoding/json"
    "log"
)

var upgrader = websocket.Upgrader{ ReadBufferSize:  1024, WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type AdminRequest struct {
        Data      string `json:"Data"`
        Requestor string `json:"Requestor"`
        Type      string `json:"Type"`
        Ukey      string `json:"Ukey"`
    } 

func main() {

    http.HandleFunc("/a", func(w http.ResponseWriter, r *http.Request) {
        var conn, _ = upgrader.Upgrade(w, r, nil)

        go func(conn *websocket.Conn) {
            for {
                _, _, err := conn.ReadMessage()
                if err != nil {         
                    log.Println("Close: "+ err.Error())
                    conn.Close()
                    return
                }
            }
        }(conn)


        go func(conn *websocket.Conn) {
            for {

                log.Println("Awaiting Message ...")
                _, msg, errCon := conn.ReadMessage()

                if errCon != nil {
                    log.Println("Read Error:", errCon)
                    break
                }

                log.Println("Message received: ")

                var r AdminRequest

                if err := json.Unmarshal(msg, &r); err != nil {

                    log.Println("Error: " + err.Error());
                    return;
                }

                fmt.Printf("%#v
", r)
                log.Println(r.Data);
            }           
        }(conn)

    })

    http.ListenAndServe(":3000", nil)
}

C# Code:

public class Client : IDisposable
    {
        private ClientWebSocket _socket;


        string _address;
        int _port;
        public Client(string address)
        {
            _address = address;

            _socket = new ClientWebSocket();
        }

        public async void SetupForReceivingStuffs()
        {
            while (_socket.State == WebSocketState.Open)
            {
                ArraySegment<byte> receivedBytes = new ArraySegment<byte>(new byte[1024]);
                WebSocketReceiveResult result = await _socket.ReceiveAsync(receivedBytes, CancellationToken.None);

                Console.WriteLine(Encoding.UTF8.GetString(receivedBytes.Array, 0, result.Count));
            }
        }

        public async void SetupForSendingStuffs(ConcurrentQueue<AdminRequest> queue)
        {
            while (_socket.State == WebSocketState.Open)
            {
                AdminRequest next;

                while (queue.Count > 0)
                {
                    if (queue.TryDequeue(out next))
                    {
                        await Send(next);
                    }
                }

                await Task.Yield();
            }
        }

        public async Task Connect()
        {
            while (_socket.State != WebSocketState.Open)
            {
                try
                {
                    _socket = new ClientWebSocket();
                    await _socket.ConnectAsync(new Uri(_address), CancellationToken.None);

                    Console.WriteLine("Socket state: " + _socket.State);
                }
                catch (Exception ex)
                {
                    //Not getting hit
                    Console.WriteLine(ex.Message);
                    Console.WriteLine(ex.StackTrace);
                }
            }
        }



        public Task Send<TData>(TData data)
        {
            string text = JsonConvert.SerializeObject(data);

            var encoded = Encoding.UTF8.GetBytes(text);
            var buffer = new ArraySegment<Byte>(encoded, 0, encoded.Length);

            return _socket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
        }


        public void Dispose()
        {
            _socket.Dispose();
        }
    }

Called by:

class Program
{
    static ConcurrentQueue<AdminRequest> _toSend;

    static void Main(string[] args)
    {
        _toSend = new ConcurrentQueue<AdminRequest>();

        Client client = new Client("ws:/(myip):(myport)/a");
        client.Connect().Wait();

        //client.SetupForReceivingStuffs();
        client.SetupForSendingStuffs(_toSend);

        WriteInstructions();

        LoopAuto();

        Console.WriteLine("Bye");
    }

    private static void LoopAuto()
    {
        DateTime nextMessage = DateTime.Now;

        while (true)
        {

            if (DateTime.Now < nextMessage) continue;
            Console.WriteLine("Next");
            nextMessage = DateTime.Now.AddSeconds(2);

            _toSend.Enqueue(new AdminRequest
            {
                Data = "{\"SomeDataHeader\":\"SomeData\"}",
                Requestor = "user",
                Type = "JustDoSomethingRequest",
                Ukey = "talca"
            });
        }
    }

    private static ConsoleKeyInfo LoopManual()
    {
        ConsoleKeyInfo info;
        do
        {
            info = Console.ReadKey(true);

            if (info.KeyChar == '1')
            {
                _toSend.Enqueue(new AdminRequest
                {
                    Data = "{\"SomeDataHeader\":\"SomeData\"}",
                    Requestor = "user",
                    Type = "JustDoSomethingRequest",
                    Ukey = "talca"
                });
            }
            else if (info.KeyChar == 'i')
            {
                WriteInstructions();
            }

        } while (info.KeyChar != 'x');

        return info;
    }

    private static void WriteInstructions()
    {
        Console.WriteLine("1. Send New Message");
        Console.WriteLine("i. Instructions (these lines)");
        Console.WriteLine("x: Exit");
    }
}
  • 写回答

1条回答 默认 最新

  • dongzai2952 2018-03-27 23:34
    关注

    The application runs two goroutines that read messages in a loop. The first does nothing with the received message. The second parses and logs the message. You do not see any output because the first goroutine is receiving the messages.

    The first goroutine does not seem to serve any purpose. Delete the first goroutine to fix the problem.

    Deleting the first goroutine also fixes a data race. Concurrent reads on a websocket connection are not supported. The race detector will report this issue.

    Here's the updated code with other fixes and improvements. The net/http server calls handlers a per-connection goroutine. Use tha goroutine instead of starting yet another goroutine. Use the websocket package's JSON helper method.

    package main
    
    import (
        "fmt"
        "net/http"
        "github.com/gorilla/websocket"
        "encoding/json"
        "log"
    )
    
    var upgrader = websocket.Upgrader{ ReadBufferSize:  1024, WriteBufferSize: 1024,
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
    
    type AdminRequest struct {
            Data      string `json:"Data"`
            Requestor string `json:"Requestor"`
            Type      string `json:"Type"`
            Ukey      string `json:"Ukey"`
        } 
    
    func wsHandler(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            log.Println(err)
            return
        }
        defer conn.Close()
        for {
            var r AdminRequest
            if err := conn.ReadJSON(&r); err != nil {
                log.Println(err)
                break
            }
            fmt.Printf("%#v
    ", r)
            log.Println(r.Data);
        }           
    }
    
    func main() {
        http.HandleFunc("/a", wsHandler)
        http.ListenAndServe(":3000", nil)
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

    报告相同问题?

    悬赏问题

    • ¥15 Qt 不小心删除了自带的类,该怎么办
    • ¥15 我需要在PC端 开两个抖店工作台客户端.(语言-java)
    • ¥15 有没有哪位厉害的人可以用C#可视化呀
    • ¥15 可以帮我看看代码哪里错了吗
    • ¥15 设计一个成绩管理系统
    • ¥15 PCL注册的选点等函数如何取消注册
    • ¥15 问一下各位,为什么我用蓝牙直接发送模拟输入的数据,接收端显示乱码呢,米思齐软件上usb串口显示正常的字符串呢?
    • ¥15 Python爬虫程序
    • ¥15 crypto 这种的应该怎么找flag?
    • ¥15 代码已写好,求帮我指出错误,有偿!