douchi2022 2019-03-25 14:54
浏览 165
已采纳

如何使用libp2p处理与golang中的对等设备有关的缓冲读写流?

I am following this tutorial:

https://github.com/libp2p/go-libp2p-examples/tree/master/chat-with-mdns

In a short form, it:

  1. configures a p2p host
  2. sets a default handler function for incoming connections (3. not necessary)
  3. and opens a stream to the connecting peers:

stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

Afterwards, there is a buffer stream/read-write variable created:

rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

Now this stream is used to send and receive data between the peers. This is done using two goroutine functions that have rw as an input:

go writeData(rw) go readData(rw)

My problems are:

  1. I want to send data to my peers and need feedback from them: e.g. in rw there is a question and they need to answer yes/no. How can I transfer back this answer and process it (enable some interaction)?

  2. The data I want to send in rw is not always the same. Sometimes it's a string containing only a name, sometimes it's a string containing a whole block etc. How can I distinguish?

I thought about those solutions. But I am new to golang, so maybe you have a better one:

  • do I need a new stream for every different content: stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

  • do I need to open more buffered rw varibales for every different content: rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

  • are there any other solutions?

Thank you for any help to solve this!!

  • 写回答

1条回答 默认 最新

  • doutang8098 2019-03-25 16:42
    关注

    This is what readData does from your tuto:

    func readData(rw *bufio.ReadWriter) {
        for {
            str, err := rw.ReadString('
    ')
            if err != nil {
                fmt.Println("Error reading from buffer")
                panic(err)
            }
    
            if str == "" {
                return
            }
            if str != "
    " {
                // Green console colour:    \x1b[32m
                // Reset console colour:    \x1b[0m
                fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
            }
    
        }
    }
    

    It basically reads the stream until it finds a , which is a new line character and prints it to stdout.

    The writeData:

    func writeData(rw *bufio.ReadWriter) {
        stdReader := bufio.NewReader(os.Stdin)
    
        for {
            fmt.Print("> ")
            sendData, err := stdReader.ReadString('
    ')
            if err != nil {
                fmt.Println("Error reading from stdin")
                panic(err)
            }
    
            _, err = rw.WriteString(fmt.Sprintf("%s
    ", sendData))
            if err != nil {
                fmt.Println("Error writing to buffer")
                panic(err)
            }
            err = rw.Flush()
            if err != nil {
                fmt.Println("Error flushing buffer")
                panic(err)
            }
        }
    }
    

    It reads data from stdin, so you can type messages, and writes this to the rw and flushes it. This kind of enables a sort of tty chat. If it works correctly you should be able to start at least two peers and communicate through stdin.

    You shouldn't recreate new rw for new content. You can reuse the existing one until you close it. From the tuto's code, a new rw is created for each new peer.


    Now a tcp stream does not work as an http request with a request and a response corresponding to that request. So if you want to send something, and get the response to that specific question, you could send a message of this format:

    [8 bytes unique ID][content of the message]
    
    

    And when you receive it, you parse it, prepare the response and send it with the same format, so that you can match messages, creating a sort of request/response communication.

    You can do something like that:

    func sendMsg(rw *bufio.ReadWriter, id int64, content []byte) error {
            // allocate our slice of bytes with the correct size 4 + size of the message + 1
            msg := make([]byte, 4 + len(content) + 1)
    
            // write id 
            binary.LittleEndian.PutUint64(msg, uint64(id))
    
            // add content to msg
            copy(msg[13:], content)
    
            // add new line at the end
            msg[len(msg)-1] = '
    '
    
            // write msg to stream
            _, err = rw.Write(msg)
            if err != nil {
                fmt.Println("Error writing to buffer")
                return err
            }
            err = rw.Flush()
            if err != nil {
                fmt.Println("Error flushing buffer")
                return err
            }
            return nil
    }
    
    func readMsg(rw *bufio.ReadWriter) {
        for {
            // read bytes until new line
            msg, err := rw.ReadBytes('
    ')
            if err != nil {
                fmt.Println("Error reading from buffer")
                continue
            }
    
            // get the id
            id := int64(binary.LittleEndian.Uint64(msg[0:8]))
    
            // get the content, last index is len(msg)-1 to remove the new line char
            content := string(msg[8:len(msg)-1])
    
            if content != "" {
                // we print [message ID] content
                fmt.Printf("[%d] %s", id, content)
            }
    
            // here you could parse your message
            // and prepare a response
            response, err := prepareResponse(content)
            if err != nil {
                fmt.Println("Err while preparing response: ", err)
                continue
            }
    
            if err := s.sendMsg(rw, id, response); err != nil {
                fmt.Println("Err while sending response: ", err)
                continue
            }
        }
    }
    

    Hope this helps.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 易语言把MYSQL数据库中的数据添加至组合框
  • ¥20 求数据集和代码#有偿答复
  • ¥15 关于下拉菜单选项关联的问题
  • ¥20 java-OJ-健康体检
  • ¥15 rs485的上拉下拉,不会对a-b<-200mv有影响吗,就是接受时,对判断逻辑0有影响吗
  • ¥15 使用phpstudy在云服务器上搭建个人网站
  • ¥15 应该如何判断含间隙的曲柄摇杆机构,轴与轴承是否发生了碰撞?
  • ¥15 vue3+express部署到nginx
  • ¥20 搭建pt1000三线制高精度测温电路
  • ¥15 使用Jdk8自带的算法,和Jdk11自带的加密结果会一样吗,不一样的话有什么解决方案,Jdk不能升级的情况