运行Go异步操作并写入映射

I have this project that tries to run unlimited bigqueries at the same time in Go. The parent project is all Python. I need to be able to keep track of the query results, like in a map.

Input:

{
 'reports_portal': 'select * from reports_portal',
 'billing_portal': 'select * from billing_portal',
}

output:

{
 'reports_portal': [23, 123, 5234, 632],
 'billing_portal': [23, 123, 5234, 632],
}

and so on

these bigqueries need to be run asynchronously as they're very slow (from a UI perspective, an SRE waiting 15-30 seconds for results.

I first try to asynchronously write items to a map:

package main

import (
    "fmt"
)


func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        go add_to_map(words_map, this_word)
    }
    fmt.Println(words_map)
}

blows up like:

$ go run try_asynchronous.go 
fatal error: concurrent map writes

goroutine 7 [running]:
runtime.throw(0x10b3b96, 0x15)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/panic.go:596 +0x95 fp=0xc420032eb8 sp=0xc420032e98
runtime.mapassign(0x109ad20, 0xc420016270, 0xc420032fa0, 0x10b3268)
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/hashmap.go:499 +0x667 fp=0xc420032f58 sp=0xc420032eb8
main.add_to_map(0xc420016270, 0x10b1ba0, 0x3, 0x0)
    /tmp/golang-w-python/try_asynchronous.go:10 +0xa3 fp=0xc420032fc0 sp=0xc420032f58
runtime.goexit()
    /usr/local/Cellar/go/1.8.1/libexec/src/runtime/asm_amd64.s:2197 +0x1 fp=0xc420032fc8 sp=0xc420032fc0
created by main.main
    /tmp/golang-w-python/try_asynchronous.go:19 +0xc8

goroutine 1 [runnable]:
fmt.(*pp).fmtString(0xc42001e0c0, 0x10b1f52, 0x7, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:424 +0x1a2
fmt.(*pp).printValue(0xc42001e0c0, 0x10953c0, 0xc42000e260, 0x98, 0x76, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:729 +0x27aa
fmt.(*pp).printValue(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x15, 0x76, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:750 +0x103d
fmt.(*pp).printArg(0xc42001e0c0, 0x109ad20, 0xc420016270, 0x76)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:682 +0x217
fmt.(*pp).doPrintln(0xc42001e0c0, 0xc420045f28, 0x1, 0x1)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:1138 +0xa1
fmt.Fprintln(0x1108140, 0xc42000c018, 0xc420045f28, 0x1, 0x1, 0xc420045ef0, 0xc420045ee0, 0x1087218)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:247 +0x5c
fmt.Println(0xc420045f28, 0x1, 0x1, 0x10b1e6f, 0x6, 0x0)
    /usr/local/Cellar/go/1.8.1/libexec/src/fmt/print.go:257 +0x57
main.main()
    /tmp/golang-w-python/try_asynchronous.go:21 +0x132
exit status 2

based on needing to run many queries at once and trying to keep track of the results by their name, I expected to write to a map during asynchronous. But fatal error: concurrent map writes says you can't.

I don't understand

  1. why not
  2. what I should do the run these bigqueries simultaneously.

EDIT:

The closest thing I have, that returns results, is not asynchronous:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    fmt.Println("Before sleep")
    random_sleep()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Results are wrong:

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

cchilders:~/work_projects/metricsportal/golang_integration (feature/golang-query) 
$ go run try_async.go 
Before sleep
Added word %v turtle
Before sleep
Added word %v cat
Before sleep
Added word %v giraffe
Before sleep
Added word %v dog
map[dog:dog plus more letters turtle:turtle plus more letters cat:cat plus more letters giraffe:giraffe plus more letters]

Results should be very fast, no longer than 3 seconds (the max of random I think):

Expectation - 

Before sleep
Before sleep
Before sleep
Before sleep
Added word %v cat
Added word %v giraffe
Added word %v turtle
Added word %v dog
dongwen3410
dongwen3410 我同意,我的预感最好,但从未使用过频道
大约 3 年之前 回复
duanpa2143
duanpa2143 添加的解决方案是合理的,但是,同步数据的另一种方法是将所有查询例程的结果推送到通道中,并使用单个go例程来更新地图。
大约 3 年之前 回复
dongmi1872
dongmi1872 哦,哈哈,我会重构
大约 3 年之前 回复
dsjq6977
dsjq6977 为什么在关键部分调用random_sleep()?-在mutex.Lock()之前调用它,因为您阻止了所有线程。
大约 3 年之前 回复
dsnnvpobaljihv3490
dsnnvpobaljihv3490 同步还不够,请参阅上面的注释。看来我需要频道,该频道必须明确地说“等待”。我的问题是我不太了解Go,也不想花所有时间,而且我的模块不是并发的。我们最大的问题是尝试多线程,我需要使用通道吗?
大约 3 年之前 回复
douyan1613
douyan1613 要扩展@JimB的评论,请查看blog.golang.org/go-maps-in-action和golang.org/pkg/sync
大约 3 年之前 回复
donglan6777
donglan6777 Go中没有任何类型对于并发写入或读写操作是安全的,您始终需要同步。
大约 3 年之前 回复

3个回答



好,让我澄清一些事情并为您提供帮助。</ p>

您无需返回 从这里修改后的地图,因为您的
函数会获得对地图的引用而不是其副本。 (让我们忽略
您完全忽略了返回值的事实)</ p>

  func add_to_map(m map [string] string,word string)map [string] string {
add_word :=单词+“加上更多字母”
m [word] = add_word
return m
}
</ code> </ pre>

接下来的事情是,您需要同步访问 地图。 您可以为此使用
互斥体。</ p>

  import“ sync” 

var互斥锁sync.Mutex //也可以创建为本地变量

func add_to_map(m map [string] string,word string){
add_word:= word +“ +更多字母”
//在这里您可以花很长时间来计算任务并计算结果
//在这里计算
Mutex.Lock()//结果准备好的锁定互斥量
推迟mutex.Unlock()// 当我们从函数返回时解锁互斥锁
m [word] = add_word //结果写入共享映射
}
</ code> </ pre>

请注意,在1.9版中, 并发地图类型。</ p>

编辑</ strong>:
您需要等待所有例程完成,因为您的 main()</ code>现在已在它们之前完成。 您可以使用 WaitGroup </ p>

 < 代码>包main 

import(
“ fmt”
“ sync”

var互斥体同步。Mutex
var wg同步。WaitGroup

func add_to_map(m map [string] string,word 字符串){
延迟wg.Done()
add_word:=单词+“加更多字母”
//在这里做大量工作
//
Mutex.Lock()
延迟Mutex.Unlock()

m [word] = add_word
}

func main(){
words_map:= make(map [string] string)
words:= [] string {“ giraffe”,“ cat”, “ dog”,“ turtle”}
表示_,this_word:=范围词{
wg.Add(1)
go add_to_map(words_map,this_word)
}
wg.Wait()
fmt。 Println(words_map)
}
</ code> </ pre>
</ div>

展开原文

原文

OK let me clarify some things and help you.

You don't need to return a modified map from here because your function gets a reference to map not copy of it. (Let's ignore the fact that you are completely ignoring return value)

func add_to_map(m map[string] string, word string) map[string]string {
    added_word := word + " plus more letters"
    m[word] = added_word
    return m
}

Next thing is that you need to synchronize access to map. You can use mutex for this.

import "sync"

var mutex sync.Mutex //glabal variable but can be created as local also

func add_to_map(m map[string] string, word string) {
    added_word := word + " plus more letters"
    // here you can do long to compute task and calculate result
    // calc here
    mutex.Lock() //result ready lock mutex
    defer mutex.Unlock() // unlock mutex when we return from function
    m[word] = added_word // result write to shared map
}

Note that in Go 1.9 there will be a Concurrent Map type.

Edit: You need to wait for all go-routines to finish because your main() now finishes before them. You can do this by using WaitGroup

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    // do heavy work here
    //
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

donglongqiao9595
donglongqiao9595 不要在关键部分称睡眠为睡前。 (我假设睡眠模拟了计算时间)。 如果您的交流彼此独立,那是安全的。
大约 3 年之前 回复
douhui3305
douhui3305 我试过了; 如果您在上面运行我的示例,您会看到它不是同时运行的(一次触发所有4个动作,然后在每个结果出现时将结果添加到map中)。 这无助于加快我们的大查询,因为速度慢是由于等待每个查询运行并返回响应。
大约 3 年之前 回复
duanji9311
duanji9311 好吧,我看到您使用了延迟,让我尝试一下
大约 3 年之前 回复
droe9376
droe9376 修复了看看编辑部分
大约 3 年之前 回复
dongyijing2353
dongyijing2353 不起作用,请查看编辑
大约 3 年之前 回复



您的代码中有两个不同的问题:</ p>

1)即使您始终在写代码 设置为不同的键,则不能同时锁定地图: https://golang.org/ doc / faq#atomic_maps </ p>

因此,您只需要确保在访问地图时获得对地图的独占访问即可。</ p>

2)您需要先完成所有goroutine的操作,然后再打印地图(这就是为什么在您编辑的代码中得到不一致的结果的原因)</ p>

一种基于示例的解决这两个问题的简单方法: </ p>

 包main 

import(
“ fmt”
“ sync”

var互斥体同步。Mutex
var wg sync.WaitGroup

func add_to_map(m map [string]字符串,单词字符串){
延迟wg.Done()
add_word:=单词+“加上更多字母”
Mutex.Lock()
延迟Mutex.Unlock()

m [word] = add_word
}

func main(){
words_map:= make(map [st ring)
个单词:= [] string {“长颈鹿”,“猫”,“狗”,“乌龟”}
表示_,this_word:=范围词{
wg.Add(1)
转到add_to_map(words_map,this_word)
}
wg.Wait()
fmt.Println(words_map)
}
</ code> </ pre>
</ div>

展开原文

原文

You have two different issues in your code:

1) Even if you are always writing to different keys, you can't do that simultaneously without locking the map: https://golang.org/doc/faq#atomic_maps

So, you need to just make sure you get exclusive access to the map when accessing it.

2) You need to finish for all goroutines to finish before printing the map (that's why you get inconsistent results in your edited code)

A simple way to solve both issues based on your example:

package main

import (
    "fmt"
    "sync"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

doutuohan6606
doutuohan6606 对不起,不得不选一个,两个都是对的。 谢谢
大约 3 年之前 回复
doucan2102
doucan2102 繁重的操作应在Lock()之外进行,否则将不异步。
大约 3 年之前 回复
duan1226
duan1226 这确实是异步的,这4个查询将并行执行
大约 3 年之前 回复
duanluan8390
duanluan8390 尽管我对此表示赞赏,但这仍然行不通。 该模块的目标是立即触发所有4个查询,然后在完成后将其写入结果映射。 这样写,它既不异步也不比python更快。 我加了另一张纸条
大约 3 年之前 回复
doudula1974
doudula1974 这与互斥无关,但不等待go例程完成。.参见我编辑过的答案
大约 3 年之前 回复
doumenshi1475
doumenshi1475 不起作用,类似地请参见上面的编辑
大约 3 年之前 回复



(代表OP发布的解决方案)</ em>。</ p>

我对假延迟的使用是错误的,解决方案都可以。 谢谢:</ p>

 程序包main 

import(
“ fmt”
“数学/ rand”
“同步”
“时间”
)\ n
var互斥锁同步.Mutex
var wg同步.WaitGroup

func random_sleep(){
r:= rand.Intn(3000)
time.Sleep(time.Duration(r)* time.Millisecond)
}

func add_to_map(m map [string]字符串,字串){
延迟wg.Done()
add_word:= word +“和更多字母”
fmt.Println(“睡眠前” )
random_sleep()
Mutex.Lock()
延迟Mutex.Unlock()
m [word] = add_word
fmt.Println(“ Added word%v”,word)
}

func main(){
words_map:= make(map [string] string)
words:= [] string {“长颈鹿”,“猫”,“狗”,“乌龟”}
表示_,this_word :=范围词{
wg.Add(1)
go add_to_map(words_map,this_word)
}
wg.Wait()
fmt.Println(words_map)
}
</ code> < / pre>

结果:</ p>

  $去运行try_async.go 
睡前
睡前
睡前
睡前
添加了单词v狗
添加了单词%v长颈鹿
A ddd字%v cat
已添加字%v turtle
map [turtle:turtle加上更多字母dog:dog加上更多字母giraffe:giraffe加上更多字母cat:cat加上更多字母]
</ code> </ pre> \ n </ div>

展开原文

原文

(Posted solution on behalf of the OP).

My usage of fake delay was wrong, the solutions both work. Thank you:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

var mutex sync.Mutex
var wg sync.WaitGroup

func random_sleep() {
    r := rand.Intn(3000)
    time.Sleep(time.Duration(r) * time.Millisecond)
}


func add_to_map(m map[string] string, word string) {
    defer wg.Done()
    added_word := word + " plus more letters"
    fmt.Println("Before sleep")
    random_sleep()
    mutex.Lock()
    defer mutex.Unlock()
    m[word] = added_word
    fmt.Println("Added word %v", word)
}


func main() {
    words_map := make(map[string]string)
    words := []string{"giraffe", "cat", "dog", "turtle"}
    for _, this_word := range words {
        wg.Add(1)
        go add_to_map(words_map, this_word)
    }
    wg.Wait()
    fmt.Println(words_map)
}

Result:

$ go run try_async.go 
Before sleep
Before sleep
Before sleep
Before sleep
Added word %v dog
Added word %v giraffe
Added word %v cat
Added word %v turtle
map[turtle:turtle plus more letters dog:dog plus more letters giraffe:giraffe plus more letters cat:cat plus more letters]

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问