douping5015
2018-03-27 03:44 阅读 136
已采纳

Gorilla Websocket没有收到消息

I am building a gateway in Golang that communicates with Gorilla websockets.

I am running it on Ubuntu 16.04, and testing it currently with a .NET console app.

Using Wireshark on Windows and sniffit on Ubuntu have determined that messages are being sent correctly from the Windows client and received by the Ubuntu box.

In my code, however, sometimes after a few successful messages, and sometimes after none, my gateway is failing to read the message (still sitting at _, msg, errCon := conn.ReadMessage())

An example of the output will be:

2018/03/27 02:38:06 Awaiting Message ... 2018/03/27 02:38:07 Message received: main.AdminRequest{Data:"{\"SomeDataHeader\":\"SomeData\"}", Requestor:"user", Type:"JustDoSomethingRequest", Ukey:"talca"} 2018/03/27 02:38:07 {"SomeDataHeader":"SomeData"} 2018/03/27 02:38:07 Awaiting Message ...

As I have previously said, it may receive a few messages like this, but, despite network traffic on both ends continuing, no more messages will be received

I am fairly new with Golang, and working under the assumption that I am missing something.

I have trimmed out error handling and the like for the sake of brevity in the code below, but this is an example of the failing code.

EDIT As requested I have added Golang full code, and C# client code below (although, as I stated, Wireshark and sniffit have determined that data is going over the wire)

package main

import (
    "fmt"
    "net/http"
    "github.com/gorilla/websocket"
    "encoding/json"
    "log"
)

var upgrader = websocket.Upgrader{ ReadBufferSize:  1024, WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type AdminRequest struct {
        Data      string `json:"Data"`
        Requestor string `json:"Requestor"`
        Type      string `json:"Type"`
        Ukey      string `json:"Ukey"`
    } 

func main() {

    http.HandleFunc("/a", func(w http.ResponseWriter, r *http.Request) {
        var conn, _ = upgrader.Upgrade(w, r, nil)

        go func(conn *websocket.Conn) {
            for {
                _, _, err := conn.ReadMessage()
                if err != nil {         
                    log.Println("Close: "+ err.Error())
                    conn.Close()
                    return
                }
            }
        }(conn)


        go func(conn *websocket.Conn) {
            for {

                log.Println("Awaiting Message ...")
                _, msg, errCon := conn.ReadMessage()

                if errCon != nil {
                    log.Println("Read Error:", errCon)
                    break
                }

                log.Println("Message received: ")

                var r AdminRequest

                if err := json.Unmarshal(msg, &r); err != nil {

                    log.Println("Error: " + err.Error());
                    return;
                }

                fmt.Printf("%#v
", r)
                log.Println(r.Data);
            }           
        }(conn)

    })

    http.ListenAndServe(":3000", nil)
}

C# Code:

public class Client : IDisposable
    {
        private ClientWebSocket _socket;


        string _address;
        int _port;
        public Client(string address)
        {
            _address = address;

            _socket = new ClientWebSocket();
        }

        public async void SetupForReceivingStuffs()
        {
            while (_socket.State == WebSocketState.Open)
            {
                ArraySegment<byte> receivedBytes = new ArraySegment<byte>(new byte[1024]);
                WebSocketReceiveResult result = await _socket.ReceiveAsync(receivedBytes, CancellationToken.None);

                Console.WriteLine(Encoding.UTF8.GetString(receivedBytes.Array, 0, result.Count));
            }
        }

        public async void SetupForSendingStuffs(ConcurrentQueue<AdminRequest> queue)
        {
            while (_socket.State == WebSocketState.Open)
            {
                AdminRequest next;

                while (queue.Count > 0)
                {
                    if (queue.TryDequeue(out next))
                    {
                        await Send(next);
                    }
                }

                await Task.Yield();
            }
        }

        public async Task Connect()
        {
            while (_socket.State != WebSocketState.Open)
            {
                try
                {
                    _socket = new ClientWebSocket();
                    await _socket.ConnectAsync(new Uri(_address), CancellationToken.None);

                    Console.WriteLine("Socket state: " + _socket.State);
                }
                catch (Exception ex)
                {
                    //Not getting hit
                    Console.WriteLine(ex.Message);
                    Console.WriteLine(ex.StackTrace);
                }
            }
        }



        public Task Send<TData>(TData data)
        {
            string text = JsonConvert.SerializeObject(data);

            var encoded = Encoding.UTF8.GetBytes(text);
            var buffer = new ArraySegment<Byte>(encoded, 0, encoded.Length);

            return _socket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
        }


        public void Dispose()
        {
            _socket.Dispose();
        }
    }

Called by:

class Program
{
    static ConcurrentQueue<AdminRequest> _toSend;

    static void Main(string[] args)
    {
        _toSend = new ConcurrentQueue<AdminRequest>();

        Client client = new Client("ws:/(myip):(myport)/a");
        client.Connect().Wait();

        //client.SetupForReceivingStuffs();
        client.SetupForSendingStuffs(_toSend);

        WriteInstructions();

        LoopAuto();

        Console.WriteLine("Bye");
    }

    private static void LoopAuto()
    {
        DateTime nextMessage = DateTime.Now;

        while (true)
        {

            if (DateTime.Now < nextMessage) continue;
            Console.WriteLine("Next");
            nextMessage = DateTime.Now.AddSeconds(2);

            _toSend.Enqueue(new AdminRequest
            {
                Data = "{\"SomeDataHeader\":\"SomeData\"}",
                Requestor = "user",
                Type = "JustDoSomethingRequest",
                Ukey = "talca"
            });
        }
    }

    private static ConsoleKeyInfo LoopManual()
    {
        ConsoleKeyInfo info;
        do
        {
            info = Console.ReadKey(true);

            if (info.KeyChar == '1')
            {
                _toSend.Enqueue(new AdminRequest
                {
                    Data = "{\"SomeDataHeader\":\"SomeData\"}",
                    Requestor = "user",
                    Type = "JustDoSomethingRequest",
                    Ukey = "talca"
                });
            }
            else if (info.KeyChar == 'i')
            {
                WriteInstructions();
            }

        } while (info.KeyChar != 'x');

        return info;
    }

    private static void WriteInstructions()
    {
        Console.WriteLine("1. Send New Message");
        Console.WriteLine("i. Instructions (these lines)");
        Console.WriteLine("x: Exit");
    }
}
  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享

1条回答 默认 最新

  • 已采纳
    dongzai2952 dongzai2952 2018-03-27 23:34

    The application runs two goroutines that read messages in a loop. The first does nothing with the received message. The second parses and logs the message. You do not see any output because the first goroutine is receiving the messages.

    The first goroutine does not seem to serve any purpose. Delete the first goroutine to fix the problem.

    Deleting the first goroutine also fixes a data race. Concurrent reads on a websocket connection are not supported. The race detector will report this issue.

    Here's the updated code with other fixes and improvements. The net/http server calls handlers a per-connection goroutine. Use tha goroutine instead of starting yet another goroutine. Use the websocket package's JSON helper method.

    package main
    
    import (
        "fmt"
        "net/http"
        "github.com/gorilla/websocket"
        "encoding/json"
        "log"
    )
    
    var upgrader = websocket.Upgrader{ ReadBufferSize:  1024, WriteBufferSize: 1024,
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }
    
    type AdminRequest struct {
            Data      string `json:"Data"`
            Requestor string `json:"Requestor"`
            Type      string `json:"Type"`
            Ukey      string `json:"Ukey"`
        } 
    
    func wsHandler(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            log.Println(err)
            return
        }
        defer conn.Close()
        for {
            var r AdminRequest
            if err := conn.ReadJSON(&r); err != nil {
                log.Println(err)
                break
            }
            fmt.Printf("%#v
    ", r)
            log.Println(r.Data);
        }           
    }
    
    func main() {
        http.HandleFunc("/a", wsHandler)
        http.ListenAndServe(":3000", nil)
    }
    
    点赞 评论 复制链接分享

相关推荐