duankong8998 2017-03-24 15:39
浏览 273
已采纳

Golang Gorilla Websocket在120秒后停止接收信息

I'm currently trying to connect to the CEX.IO bitcoin exchange's websocket, but have been having issues not only with CEX.IO but with others too. All of my connections drop around the 120-second mark which makes me think there is some TTL problem going on. The Process() goroutine in the main package ends up just hanging and waiting for data from the readLoop which just stops receiving data. I've included some read-only API keys in the code so you can test if you'd like.

package main

import (
  "fmt"
  "bitbucket.org/tradedefender/cryptocurrency/exchange-connector/cexio"
  "github.com/shopspring/decimal"
  "encoding/json"
  "time"
)

type OrderBook struct {
  Asks []Ask
  Bids []Bid
}

type Ask struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

type Bid struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

func main() {
  cexioConn := new(cexio.Connection)

  err := cexioConn.Connect()
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  err = cexioConn.Authenticate("TLwYkktLf7Im6nqSKt6UO1IrU", "9ImOJcR7Qj3LMIyPCzky0D7WE")
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  readChannel := make(chan cexio.IntraAppMessage, 25)

  go cexioConn.ReadLoop(readChannel)

  processor := Processor{
    WatchPairs: [][2]string{
      [2]string{
        "BTC", "USD",
      },
    },
    conn: cexioConn,
  }

  go processor.Process(readChannel)

  // LOL
  for {
    continue
  }

}

type Processor struct {
  WatchPairs [][2]string
  conn *cexio.Connection
}

func (p *Processor) Process(ch <-chan cexio.IntraAppMessage) {

  p.conn.SubscribeToOrderBook(p.WatchPairs[0])

  pingTimer := time.Now().Unix()
  for {

    fmt.Printf("(%v)
", time.Now().Unix())

    if (time.Now().Unix() - pingTimer) >= 10 {
      fmt.Println("sending ping")
      p.conn.SendPing()
      pingTimer = time.Now().Unix()
    }

    readMsg := <- ch
    output, _ := json.Marshal(readMsg.SocketMessage)
    fmt.Println(string(output))

    if readMsg.SocketMessage.Event == "ping" {
      fmt.Println("sending pong")
      p.conn.SendPong()
      pingTimer = time.Now().Unix()
    }

  }
}

Below is the connector to the cexio websocket. Here is a link to their API: https://cex.io/websocket-api

package cexio

import (
  "github.com/gorilla/websocket"
  //"github.com/shopspring/decimal"
  "github.com/satori/go.uuid"
  "encoding/hex"
  "encoding/json"
  "crypto/hmac"
  "crypto/sha256"
  "bytes"
  "strconv"
  "time"
  "fmt"
)

const Url = "wss://ws.cex.io/ws/"

type Connection struct {
  conn *websocket.Conn
}

type IntraAppMessage struct {
  SocketMessage   GenericMessage
  ProgramMessage  ProgramMessage
}

type GenericMessage struct {
  Event   string      `json:"e"`
  Data    interface{} `json:"data"`
  Auth    AuthData    `json:"auth,omitempty"`
  Ok      string      `json:"ok,omitempty"`
  Oid     string      `json:"oid,omitempty"`
  Time    int64       `json:"time,omitempty"`
}

type ProgramMessage struct {
  Error   string
}

type AuthData struct {
  Key       string  `json:"key"`
  Signature string  `json:"signature"`
  Timestamp int64   `json:"timestamp"`
}

type OrderBookSubscribeData struct {
  Pair      [2]string   `json:"pair"`
  Subscribe bool        `json:"subscribe"`
  Depth     int         `json:"depth"`
}

func (c *Connection) SendPong() error {

  pongMsg := GenericMessage{
    Event: "pong",
  }

  err := c.conn.WriteJSON(pongMsg)
  if err != nil {
    return nil
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PongMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) SendPing() error {

  pingMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(pingMsg)
  if err != nil {
    return err
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PingMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) Connect() error {
  dialer := *websocket.DefaultDialer
  wsConn, _, err := dialer.Dial(Url, nil)
  if err != nil {
    return err
  }

  c.conn = wsConn
  //c.conn.SetPingHandler(c.HandlePing)

  for {

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "connected" {
      c.Disconnect()
      return err
    } else {
      break
    }

  }

  return nil
}

func (c *Connection) Disconnect() error {
  return c.conn.Close()
}

func (c *Connection) ReadLoop(ch chan<- IntraAppMessage) {
  for {

    fmt.Println("starting new read")

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    ch <- IntraAppMessage{
      SocketMessage: m,
    }

  }
}

func CreateSignature(timestamp int64, key, secret string) string {
  secretBytes := []byte(secret)
  h := hmac.New(sha256.New, secretBytes)

  var buffer bytes.Buffer
  buffer.WriteString(strconv.FormatInt(timestamp, 10))
  buffer.WriteString(key)

  h.Write(buffer.Bytes())

  return hex.EncodeToString(h.Sum(nil))
}

func (c *Connection) Authenticate(key, secret string) error {

  timestamp := time.Now().Unix()
  signature := CreateSignature(timestamp, key, secret)

  var authMsg GenericMessage
  authMsg.Event = "auth"
  authMsg.Auth = AuthData{
    Key: key,
    Signature: signature,
    Timestamp: timestamp,
  }

  err := c.conn.WriteJSON(authMsg)
  if err != nil {
    return err
  }

  for {
    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "auth" && m.Ok != "ok" {
      c.Disconnect()
      return err
    } else {
      break
    }
  }

  return nil

}

func (c *Connection) SubscribeToOrderBook(pair [2]string) error {

  sendMsg := GenericMessage{
    Event: "order-book-subscribe",
    Data: OrderBookSubscribeData{
      Pair: pair,
      Subscribe: true,
      Depth: 0,
    },
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) GetBalance() error {

  sendMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}
  • 写回答

1条回答 默认 最新

  • dongtan8532 2017-03-24 23:43
    关注

    Solution was to remove the

    for { 
      continue 
    }
    

    at the end of the main function

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 C语言PE文件遍历节表
  • ¥30 backtrader对于期货交易的现金和资产计算的问题
  • ¥15 求C# .net4.8小报表工具
  • ¥15 安装虚拟机时出现问题
  • ¥15 Selenium+docker Chrome不能运行
  • ¥15 mac电脑,安装charles后无法正常抓包
  • ¥18 visio打开文件一直显示文件未找到
  • ¥15 请教一下,openwrt如何让同一usb储存设备拔插后设备符号不变?
  • ¥50 使用quartz框架进行分布式任务定时调度,启动了两个实例,但是只有一个实例参与调度,另外一个实例没有参与调度,不知道是为什么?请各位帮助看一下原因!!
  • ¥50 怎么获取Ace Editor中的python代码后怎么调用Skulpt执行代码