douliangpo0128 2018-06-24 22:00
浏览 48

需要帮助了解libchan的工作方式

I'm trying to use the libchan library to send messages between machines using a go channel-like transport.

From what I've gathered, the rough idea is this:

  1. You have a SPDY client that sends a serialized command object to an address over tcp. This command object contains a libchan channel called a Pipe that the response is sent through.
  2. When the server receives an incoming connection, it waits for a command object. When it gets one, it sends a response through the Pipe contained in the object.

Here's my point of confusion. For a channel to persist between two machines, they'd have to share memory or atleast share an abstraction that connects the two of them. From my grokking of the libchan codebase, I have no idea how this could be possible.

Here's a snippet from the example in the repo:

// client

    receiver, remoteSender := libchan.Pipe()
    command := &RemoteCommand{
        Cmd:        os.Args[1],
        Args:       os.Args[2:],
        Stdin:      os.Stdin,
        Stdout:     os.Stdout,
        Stderr:     os.Stderr,
        StatusChan: remoteSender,
    }

    err = sender.Send(command)
    if err != nil {
        log.Fatal(err)
    }
    err = receiver.Receive(response)
    if err != nil {
        log.Fatal(err)
    }

    os.Exit(response.Status)

and the server:

// server
t := spdy.NewTransport(p)

        go func() {
            for {
                receiver, err := t.WaitReceiveChannel()
                if err != nil {
                    log.Print("receiver error")
                    log.Print(err)
                    break
                }
                log.Print("about to spawn receive proc")
                go func() {
                    for {
                        command := &RemoteReceivedCommand{}
                        err := receiver.Receive(command)
                        returnResult := &CommandResponse{}
                        if res != nil {
                            if exiterr, ok := res.(*exec.ExitError); ok {
                                returnResult.Status = exiterr.Sys(). 
                              (syscall.WaitStatus).ExitStatus()
                            } else {
                                log.Print("res")
                                log.Print(res)
                                returnResult.Status = 10
                            }
                        }
                        err = command.StatusChan.Send(returnResult)

The point I'm trying to hone in is here:

libchan.Pipe()

According to the source, this returns a channel. One reference is kept on the client, and the other is sent to the server. This channel is then used to pipe values from the latter to the former. How does this actually work in practice?

Full code for client and server

  • 写回答

1条回答 默认 最新

  • dousou2897 2018-06-24 23:07
    关注

    First, it's good to know that all Pipe() does is make a channel and return the in-memory sender/receiver pair.

    From inmem.go:

    // Pipe returns an inmemory Sender/Receiver pair.
    func Pipe() (Receiver, Sender) {
        c := make(chan interface{})
        return pReceiver(c), pSender(c)
    }
    

    Then you can look in inmem_test.go for a simple end-to-end example.

    This struct is the equivalent of RemoteCommand from the demo.

    type InMemMessage struct {
        Data   string
        Stream io.ReadWriteCloser
        Ret    Sender
    }
    

    In TestInmemRetPipe(), a simple client and server are created.

    The client creates a local sender/receiver pair using Pipe(), while the server simply uses the libchan.Sender interface in the InMemMessage struct.

    Note that the client and server are functions which receive a Sender or Receiver as an argument respectively. More on this in the next code snippet.

    func TestInmemRetPipe(t *testing.T) {
        client := func(t *testing.T, w Sender) {
            ret, retPipe := Pipe()
            message := &InMemMessage{Data: "hello", Ret: retPipe}
    
            err := w.Send(message)
            if err != nil {
                t.Fatal(err)
            }
            msg := &InMemMessage{}
            err = ret.Receive(msg)
            if err != nil {
                t.Fatal(err)
            }
    
            if msg.Data != "this better not crash" {
                t.Fatalf("%#v", msg)
            }
    
        }
        server := func(t *testing.T, r Receiver) {
            msg := &InMemMessage{}
            err := r.Receive(msg)
            if err != nil {
                t.Fatal(err)
            }
    
            if msg.Data != "hello" {
                t.Fatalf("Wrong message:
    \tExpected: %s
    \tActual: %s", "hello", msg.Data)
            }
            if msg.Ret == nil {
                t.Fatal("Message Ret is nil")
            }
    
            message := &InMemMessage{Data: "this better not crash"}
            if err := msg.Ret.Send(message); err != nil {
                t.Fatal(err)
            }
        }
        SpawnPipeTestRoutines(t, client, server)
    
    }
    

    SpawnPipeTestRoutines() executes the client and server functions. In this function, another sender/receiver air is instantiated via Pipe().

    In the demo application, the function being performed here by Pipe() (i.e. facilitating communication between the client and server instances) is instead handled via network communications.

    func SpawnPipeTestRoutines(t *testing.T, s SendTestRoutine, r ReceiveTestRoutine) {
        end1 := make(chan bool)
        end2 := make(chan bool)
    
        receiver, sender := Pipe()
    
        go func() {
            defer close(end1)
            s(t, sender)
            err := sender.Close()
            if err != nil {
                t.Fatalf("Error closing sender: %s", err)
            }
        }()
    
        go func() {
            defer close(end2)
            r(t, receiver)
        }()
        ...
    

    In the demo application, the communication is facilitated by calls to Transport.NewSendChannel() on the client and Transport.WaitReceiveChannel(), which return a libchan.Sender and libchan.Receiver respectively. These libchan instances handle facilitating the "pipe" via the network.

    From client.go:

    sender, err := transport.NewSendChannel()
    ...
    err = sender.Send(command)
    

    From server.go:

    receiver, err := t.WaitReceiveChannel()
    ...
    err := receiver.Receive(command)
    

    In both cases, the prerequisite transport configuration is done beforehand (i.e. binding to sockets, utilizing TLS, etc.).

    It's probably also worth noting that the spdy library being used is part of the libchan distribution, hence it providing libchan primitives.

    评论

报告相同问题?

悬赏问题

  • ¥100 set_link_state
  • ¥15 虚幻5 UE美术毛发渲染
  • ¥15 CVRP 图论 物流运输优化
  • ¥15 Tableau online 嵌入ppt失败
  • ¥100 支付宝网页转账系统不识别账号
  • ¥15 基于单片机的靶位控制系统
  • ¥15 真我手机蓝牙传输进度消息被关闭了,怎么打开?(关键词-消息通知)
  • ¥15 装 pytorch 的时候出了好多问题,遇到这种情况怎么处理?
  • ¥20 IOS游览器某宝手机网页版自动立即购买JavaScript脚本
  • ¥15 手机接入宽带网线,如何释放宽带全部速度