duankong8998 2017-03-24 15:39
浏览 272
已采纳

Golang Gorilla Websocket在120秒后停止接收信息

I'm currently trying to connect to the CEX.IO bitcoin exchange's websocket, but have been having issues not only with CEX.IO but with others too. All of my connections drop around the 120-second mark which makes me think there is some TTL problem going on. The Process() goroutine in the main package ends up just hanging and waiting for data from the readLoop which just stops receiving data. I've included some read-only API keys in the code so you can test if you'd like.

package main

import (
  "fmt"
  "bitbucket.org/tradedefender/cryptocurrency/exchange-connector/cexio"
  "github.com/shopspring/decimal"
  "encoding/json"
  "time"
)

type OrderBook struct {
  Asks []Ask
  Bids []Bid
}

type Ask struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

type Bid struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

func main() {
  cexioConn := new(cexio.Connection)

  err := cexioConn.Connect()
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  err = cexioConn.Authenticate("TLwYkktLf7Im6nqSKt6UO1IrU", "9ImOJcR7Qj3LMIyPCzky0D7WE")
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  readChannel := make(chan cexio.IntraAppMessage, 25)

  go cexioConn.ReadLoop(readChannel)

  processor := Processor{
    WatchPairs: [][2]string{
      [2]string{
        "BTC", "USD",
      },
    },
    conn: cexioConn,
  }

  go processor.Process(readChannel)

  // LOL
  for {
    continue
  }

}

type Processor struct {
  WatchPairs [][2]string
  conn *cexio.Connection
}

func (p *Processor) Process(ch <-chan cexio.IntraAppMessage) {

  p.conn.SubscribeToOrderBook(p.WatchPairs[0])

  pingTimer := time.Now().Unix()
  for {

    fmt.Printf("(%v)
", time.Now().Unix())

    if (time.Now().Unix() - pingTimer) >= 10 {
      fmt.Println("sending ping")
      p.conn.SendPing()
      pingTimer = time.Now().Unix()
    }

    readMsg := <- ch
    output, _ := json.Marshal(readMsg.SocketMessage)
    fmt.Println(string(output))

    if readMsg.SocketMessage.Event == "ping" {
      fmt.Println("sending pong")
      p.conn.SendPong()
      pingTimer = time.Now().Unix()
    }

  }
}

Below is the connector to the cexio websocket. Here is a link to their API: https://cex.io/websocket-api

package cexio

import (
  "github.com/gorilla/websocket"
  //"github.com/shopspring/decimal"
  "github.com/satori/go.uuid"
  "encoding/hex"
  "encoding/json"
  "crypto/hmac"
  "crypto/sha256"
  "bytes"
  "strconv"
  "time"
  "fmt"
)

const Url = "wss://ws.cex.io/ws/"

type Connection struct {
  conn *websocket.Conn
}

type IntraAppMessage struct {
  SocketMessage   GenericMessage
  ProgramMessage  ProgramMessage
}

type GenericMessage struct {
  Event   string      `json:"e"`
  Data    interface{} `json:"data"`
  Auth    AuthData    `json:"auth,omitempty"`
  Ok      string      `json:"ok,omitempty"`
  Oid     string      `json:"oid,omitempty"`
  Time    int64       `json:"time,omitempty"`
}

type ProgramMessage struct {
  Error   string
}

type AuthData struct {
  Key       string  `json:"key"`
  Signature string  `json:"signature"`
  Timestamp int64   `json:"timestamp"`
}

type OrderBookSubscribeData struct {
  Pair      [2]string   `json:"pair"`
  Subscribe bool        `json:"subscribe"`
  Depth     int         `json:"depth"`
}

func (c *Connection) SendPong() error {

  pongMsg := GenericMessage{
    Event: "pong",
  }

  err := c.conn.WriteJSON(pongMsg)
  if err != nil {
    return nil
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PongMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) SendPing() error {

  pingMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(pingMsg)
  if err != nil {
    return err
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PingMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) Connect() error {
  dialer := *websocket.DefaultDialer
  wsConn, _, err := dialer.Dial(Url, nil)
  if err != nil {
    return err
  }

  c.conn = wsConn
  //c.conn.SetPingHandler(c.HandlePing)

  for {

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "connected" {
      c.Disconnect()
      return err
    } else {
      break
    }

  }

  return nil
}

func (c *Connection) Disconnect() error {
  return c.conn.Close()
}

func (c *Connection) ReadLoop(ch chan<- IntraAppMessage) {
  for {

    fmt.Println("starting new read")

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    ch <- IntraAppMessage{
      SocketMessage: m,
    }

  }
}

func CreateSignature(timestamp int64, key, secret string) string {
  secretBytes := []byte(secret)
  h := hmac.New(sha256.New, secretBytes)

  var buffer bytes.Buffer
  buffer.WriteString(strconv.FormatInt(timestamp, 10))
  buffer.WriteString(key)

  h.Write(buffer.Bytes())

  return hex.EncodeToString(h.Sum(nil))
}

func (c *Connection) Authenticate(key, secret string) error {

  timestamp := time.Now().Unix()
  signature := CreateSignature(timestamp, key, secret)

  var authMsg GenericMessage
  authMsg.Event = "auth"
  authMsg.Auth = AuthData{
    Key: key,
    Signature: signature,
    Timestamp: timestamp,
  }

  err := c.conn.WriteJSON(authMsg)
  if err != nil {
    return err
  }

  for {
    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "auth" && m.Ok != "ok" {
      c.Disconnect()
      return err
    } else {
      break
    }
  }

  return nil

}

func (c *Connection) SubscribeToOrderBook(pair [2]string) error {

  sendMsg := GenericMessage{
    Event: "order-book-subscribe",
    Data: OrderBookSubscribeData{
      Pair: pair,
      Subscribe: true,
      Depth: 0,
    },
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) GetBalance() error {

  sendMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}
  • 写回答

1条回答 默认 最新

  • dongtan8532 2017-03-24 23:43
    关注

    Solution was to remove the

    for { 
      continue 
    }
    

    at the end of the main function

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 shape_predictor_68_face_landmarks.dat
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 对于相关问题的求解与代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料