dougekui1518 2015-12-28 18:47
浏览 72

带有通道的Golang测试不会退出

The following Golang test never exits. I suspect it has something to do with a channel deadlock but being a go-noob, I am not very certain.

const userName = "xxxxxxxxxxxx"

func TestSynchroninze(t *testing.T) {
    c, err := channel.New(github.ChannelName, authToken)
    if err != nil {
        t.Fatalf("Could not create channel: %s", err)
        return
    }

    state := channel.NewState(nil)
    ctx := context.Background()
    ctx = context.WithValue(ctx, "userId", userName)
    user := api.User{}

    output, errs := c.Synchronize(state, ctx)

    if err = <-errs; err != nil {
        t.Fatalf("Error performing synchronize: %s", err)
        return
    }

    for o := range output {
        switch oo := o.Data.(type) {
        case api.User:
            user = oo
            glog.Infof("we have a USER %s
", user)
        default:
            t.Errorf("Encountered unexpected data type: %T", oo)
        }
    }
}

Here are the methods being tested.

type github struct {
    client *api.Client
}

func newImplementation(t auth.UserToken) implementation.Implementation {
    return &github{client: api.NewClient(t)}
}

// -------------------------------------------------------------------------------------

const (
    kLastUserFetch = "lastUserFetch"
)

type synchronizeFunc func(implementation.MutableState, chan *implementation.Output, context.Context) error

// -------------------------------------------------------------------------------------

    func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
        output := make(chan *implementation.Output)
        errors := make(chan error, 1) // buffer allows preflight errors

        // Close output channels once we're done
        defer func() {
            go func() {
                // wg.Wait()

                close(errors)
                close(output)
            }()
        }()

        err := g.fetchUser(state, output, ctx)
        if err != nil {
            errors <- err
        }

        return output, errors
    }

func (g *github) fetchUser(state implementation.MutableState, output chan *implementation.Output, ctx context.Context) error {
    var err error

    var user = api.User{}
    userId, _ := ctx.Value("userId").(string)
    user, err = g.client.GetUser(userId, ctx.Done())

    if err == nil {
        glog.Info("No error in fetchUser")
        output <- &implementation.Output{Data: user}
        state.SetTime(kLastUserFetch, time.Now())
    }

    return err
}

func (c *Client) GetUser(id string, quit <-chan struct{}) (user User, err error) {
    // Execute request
    var data []byte
    data, err = c.get("users/"+id, nil, quit)
    glog.Infof("USER DATA %s", data)

    // Parse response
    if err == nil && len(data) > 0 {
        err = json.Unmarshal(data, &user)

        data, _ = json.Marshal(user)
    }

    return
}

Here is what I see in the console (most of the user details removed)

I1228 13:25:05.291010   21313 client.go:177] GET https://api.github.com/users/xxxxxxxx
I1228 13:25:06.010085   21313 client.go:36] USER DATA {"login":"xxxxxxxx","id":00000000,"avatar_url":"https://avatars.githubusercontent.com/u/0000000?v=3",...}
I1228 13:25:06.010357   21313 github.go:90] No error in fetchUser

==========EDIT=============

Here is the relevant portion of the api package.

package api

type Client struct {
    authToken auth.UserToken
    http      *http.Client
}

func NewClient(authToken auth.UserToken) *Client {
    return &Client{
        authToken: authToken,
        http:      auth.NewClient(authToken),
    }
}




// -------------------------------------------------------------------------------------
type User struct {
    Id             int    `json:"id,omitempty"`
    Username       string `json:"login,omitempty"`
    Email          string `json:"email,omitempty"`
    FullName       string `json:"name,omitempty"`
    ProfilePicture string `json:"avatar_url,omitempty"`
    Bio            string `json:"bio,omitempty"`
    Website        string `json:"blog,omitempty"`
    Company        string `json:"company,omitempty"`
}

And the channel package

package channel

type Channel struct {
    implementation.Descriptor
    imp implementation.Implementation
}

// New returns a channel implementation with a given name and auth token.
func New(name string, token auth.UserToken) (*Channel, error) {
    if desc, ok := implementation.Lookup(name); ok {
        if imp := implementation.New(name, token); imp != nil {
            return &Channel{Descriptor: desc, imp: imp}, nil
        }
    }

    return nil, ErrInvalidChannel
}

and the implementation package...

package implementation

import "golang.org/x/net/context"

// -------------------------------------------------------------------------------------

// Implementation is the interface implemented by subpackages.
type Implementation interface {
    // Synchronize performs a synchronization using the given state. A context parameters
    // is provided to provide cancellation as well as implementation-specific behaviors.
    //
    // If a fatal error occurs (see package error definitions), the state can be discarded
    // to prevent the persistence of an invalid state.
    Synchronize(state MutableState, ctx context.Context) (<-chan *Output, <-chan error)

    // FetchDetails gets details for a given timeline item. Any changes to the TimelineItem
    // (including the Meta value) will be persisted.
    FetchDetails(item *TimelineItem, ctx context.Context) (interface{}, error)
}

======Edit #2=======

This is the original Synchronize method. I removed some details in my testing to try and simplify the problem. By removing a go func call, I believe I introduced a new problem which could be confusing things.

Here is the original Synchronize method. There are some things with Wait Groups and a function array containing a single function because this method will eventually be synchronizing multiple functions.

func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
    wg := sync.WaitGroup{}
    output := make(chan *implementation.Output)
    errors := make(chan error, 1) // buffer allows preflight errors

    // Close output channels once we're done
    defer func() {
        go func() {
            wg.Wait()

            close(errors)
            close(output)
        }()
    }()

    // Perform fetch functions in separate routines
    funcs := []synchronizeFunc{
        g.fetchUser,
    }

    for _, f := range funcs {
        wg.Add(1)

        go func(f synchronizeFunc) {
            defer wg.Done()

            if err := f(state, output, ctx); err != nil {
                errors <- err
            }
        }(f)
    }

    glog.Info("after go sync...")

    return output, errors
}
  • 写回答

1条回答 默认 最新

  • doupi1532 2015-12-28 19:45
    关注

    I think the two problems are in

         output <- &implementation.Output{Data: user}
    

    the channel does not have a buffer. It will block until some other goroutine reads from it. But in your code is the same goroutine so it will block.

    and second:

        // Close output channels once we're done
        defer func() {
            go func() {
                // wg.Wait()
    
                close(errors)
                close(output)
            }()
        }()
    

    you launch a go routine when the routine exits. The goroutine is scheduled, the function returns but it never calls the goroutine.

    I would suggest to unify all that logic in one:

     func (g *github) Synchronize(state implementation.MutableState, ctx context.Context) (<-chan *implementation.Output, <-chan error) {
        output := make(chan *implementation.Output)
        errors := make(chan error, 1) // buffer allows preflight errors
    
        go func() {
          defer close(output)
          defer close(errors)
          err := g.fetchUser(state, output, ctx)
          if err != nil {
            errors <- err
          }
       }()        
       return output, errors
      }
    
    评论

报告相同问题?

悬赏问题

  • ¥30 帮我写一段可以读取LD2450数据并计算距离的Arduino代码
  • ¥15 C#调用python代码(python带有库)
  • ¥15 矩阵加法的规则是两个矩阵中对应位置的数的绝对值进行加和
  • ¥15 活动选择题。最多可以参加几个项目?
  • ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型
  • ¥15 vs2019中数据导出问题
  • ¥20 云服务Linux系统TCP-MSS值修改?
  • ¥20 关于#单片机#的问题:项目:使用模拟iic与ov2640通讯环境:F407问题:读取的ID号总是0xff,自己调了调发现在读从机数据时,SDA线上并未有信号变化(语言-c语言)
  • ¥20 怎么在stm32门禁成品上增加查询记录功能
  • ¥15 Source insight编写代码后使用CCS5.2版本import之后,代码跳到注释行里面