dseigqk7443 2017-09-10 12:34
浏览 69

Go中的并行saxpy实现无法在内核之间很好地扩展

So I'm trying to implement an implementation of saxpy that is both blocked and can be computed in parallel using the 8-cores available on my machine. I started with the assumption that small sizes of the vectors x and y which fit into the L1 cache of my machine (split 256kB - 128kB data, 128kB code), can be computed in serial. To test this assumption, I wrote two implementations of saxpy, one which is a blocked serial version of saxpy (BSS) and a blocked parallel version of saxpy (BPS). The blocking algorithm is used only when the sizes of the vectors are larger than 4096 elements long. The following are the implementations:

const cachecap = 32*1024/8 // 4096
func blocked_serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    //fmt.Println("zn: ", zn)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    //fmt.Println("nblocks: ", nblocks)
    for i := 0; i < nblocks; i++ {
        beg := i * cachecap
        end := (i + 1) * cachecap
        if end >= zn {
            end = zn
        }
        //fmt.Println("beg, end: ", beg, end)
        xb := x[beg:end]
        yb := y[beg:end]
        zb := z[beg:end]
        serial_saxpy(a, xb, incx, b, yb, incy, zb, incz)
    }
}
func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        //fmt.Println("zn <= cachecap: using serial_saxpy")
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    //fmt.Println("nblocks: ", nblocks)
    nworkers := runtime.GOMAXPROCS(0)
    if nblocks < nworkers {
        nworkers = nblocks
    }
    //fmt.Println("nworkers: ", nworkers)

    //buf := blockSize*nworkers
    //if buf > nblocks {
    //  buf = nblocks
    //}
    //sendchan := make(chan block, buf)
    sendchan := make(chan block, nblocks)

    var wg sync.WaitGroup
    for i := 0; i < nworkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            a, b := a, b
            incx, incy, incz := incx, incy, incz
            for blk := range sendchan {
                beg, end := blk.beg, blk.end
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }
        }()
    }

    for i := 0; i < nblocks; i++ {
        beg := i * cachecap
        end := (i + 1) * cachecap
        if end >= zn {
            end = zn
        }
        //fmt.Println("beg:end", beg, end)
        sendchan <- block{beg, end}
    }
    close(sendchan)
    wg.Wait()
}

func serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    if incx <= 0 || incy <= 0 || incz <= 0 {
        panic("AxpBy: zero or negative increments not supported")
    }

    n := len(z) / incz
    if incx == 1 && incy == 1 && incz == 1 {
        if a == 1 && b == 1 {
            for i := 0; i < n; i++ {
                z[i] = x[i] + y[i]
            }
            return
        }

        if a == 0 && b == 1 {
            copy(z, y)
            //for i := 0; i < n; i++ {
            //  z[i] = y[i]
            //}
            return
        }

        if a == 1 && b == 0 {
            copy(z, x)
            //for i := 0; i < n; i++ {
            //  z[i] = x[i]
            //}
            return
        }

        if a == 0 && b == 0 {
            return
        }

        for i := 0; i < n; i++ {
            z[i] = a*x[i] + b*y[i]
        }
        return
    }

    // unequal increments or equal increments != 1
    ix, iy, iz := 0, 0, 0
    if a == 1 && b == 1 {
        for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
            z[iz] = x[ix] + y[iy]
        }
        return
    }

    if a == 0 && b == 1 {
        for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
            z[iz] = y[iy]
        }
        return
    }

    if a == 1 && b == 0 {
        for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
            z[iz] = x[ix]
        }
        return
    }

    if a == 0 && b == 0 {
        return
    }

    for i := 0; i < n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
        z[iz] = a*x[ix] + b*y[iy]
    }
}

I then wrote benchmarks for the three functions blocked_serial_saxpy, blocked_parallel_saxpy and serial_saxpy. The following image shows the results of the benchmarks with vector sizes 1e3, 1e4, 1e5, 2e5, 3e5, 4e5, 6e5, 8e5 and 1e6 respectively: Saxpy Benchmarks Part 1Saxpy Benchmarks Part 2

To help me visualize the performance of the blocked_parallel_saxpy implementation, I plotted the results and this is what I obtained: Saxpy Performance Plot Looking at the plot, makes me wonder, why am I not seeing a parallel speedup, when all the CPUs are being used and at 100% during the blocked_parallel_saxpy benchmark. The image from task manager is below: Saxpy CPU Usage

Could someone help me understand what's going on here? Is what I'm seeing, symptom of a problem or the way it should be? If it's the former, is there a way to fix it?

Edit: I have modified the blocked_parallel_saxpy code to the following. I dividing the total no.of blocks (nblocks) such that there are nworker goroutines computing nworker no. of blocks, in parallel. In addition, I have removed the channel. I have benchmarked the code and it performs identically to the parallel implementation with the channel, hence why I haven't attached the benchmarks.

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    nworkers := runtime.GOMAXPROCS(0)
    if nblocks < nworkers {
        nworkers = nblocks
    }

    var wg sync.WaitGroup
    for i := 0; i < nworkers; i++ {
        for j := 0; j < nworkers && (i+j) < nblocks; j++ {
            wg.Add(1)
            go func(i, j int) {
                defer wg.Done()
                a, b := a, b
                incx, incy, incz := incx, incy, incz
                k := i + j
                beg := k * cachecap
                end := (k + 1) * cachecap
                if end >= zn {
                    end = zn
                }
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }(i, j)
        }
    wg.Wait()
}

Edit.2: I have written another version of the blocked_parallel_saxpy, again, without channels. This time, I spawn NumCPU goroutines, each processing nblocks/nworkers + 1 blocks where each block is cachecap no. of elements in length. Even, here, the code performs identically to the previous two implementations.

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    nworkers := runtime.GOMAXPROCS(runtime.NumCPU())
    if nblocks < nworkers {
        nworkers = nblocks
    }

    k := nblocks/nworkers + 1
    var wg sync.WaitGroup
    wg.Add(nworkers)
    for i := 0; i < nworkers; i++ {
        go func(i int) {
            defer wg.Done()
            for j := 0; j < k && (j+i*k) < nblocks; j++ {
                beg := (j + i*k) * cachecap
                end := beg + cachecap
                if end > zn {
                    end = zn
                }
                //fmt.Printf("i:%d, j:%d, k:%d, [beg:end]=[%d:%d]
", i, j, k, beg, end)
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }
        }(i)
    }

    wg.Wait()
}
  • 写回答

2条回答 默认 最新

  • ds355020 2017-09-10 14:06
    关注

    I'd try a parallel version without channels, where each worker computes every 8th block, without coordination.

    评论

报告相同问题?

悬赏问题

  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 保护模式-系统加载-段寄存器