I'm trying to implement a generic search algorithm that will traverse any arbitrary tree-like data structure concurrently (spinning up a go routine for each level of the tree).
The issue I have is when search fails (it completes walking the entire tree and finds no match) I end up with a deadlock and I don't understand why.
The code works when correctly when search succeeds.
I added the helper functions for your context, however my issue is with the way I'm using go-routines and channels which is all in the concurrentSearch() function.
I apoligize in advance if you find the code messy. I've been going back and forth all day trying out different ideas.
Thanks for any feedback!
Here's my code:
package search
import "reflect"
// Search an arbitrary tree / map like data structure.
func concurrentSearch(output chan interface{}, data interface{}, key string) {
// search() is your search algorithm.
result := search(data, key)
if result != nil {
// Found something, so put it on the chan.
// This is the point of the function.
output <- result
return // Success.
}
// Since we didn't find a match we will get a slice of the next level of values.
iterableType := listValues(data)
if len(iterableType) == 0 {
return
}
var wg sync.WaitGroup
for _, value := range iterableType {
wg.Add(1)
go func(next interface{}) {
defer wg.Done()
concurrentSearch(output, next, key)
}(value)
}
wg.Wait()
// Wait never finishes, so the function never returns
}
// Returns a slice of values based on Type.
func listValues(data interface{}) []interface{} {
value := reflect.ValueOf(data)
values := []interface{}{}
switch value.Kind() {
case reflect.Map:
for _, key := range value.MapKeys() {
values = append(values, value.MapIndex(key).Interface())
}
case reflect.Slice:
for ii := 0; ii < value.Len(); ii++ {
values = append(values, value.Index(ii).Interface())
}
case reflect.Struct:
for ii := 0; ii < value.NumField(); ii++ {
if value.Field(ii).CanSet() {
values = append(values, value.Field(ii).Interface())
}
}
case reflect.Ptr:
rawValue := value.Elem()
if !rawValue.IsValid() {
return nil
}
values = listValues(rawValue)
case reflect.Interface:
rawValue := value.Elem()
values = listValues(rawValue)
}
return values
}
// search just checks the value for an index of key, otherwise returns nil.
func search(data interface{}, key string) interface{} {
value := reflect.ValueOf(data)
switch value.Kind() {
// If the type is indexable, check the index.
// Default to returning nil
case reflect.Struct:
for ii := 0; ii < value.NumField(); ii++ {
field := value.Field(ii)
// Without CanSet() reflect panics about Unexported fields.
if field.CanSet() {
if field.Type().Name() == key {
return field.Elem().Interface()
}
}
}
return nil
case reflect.Map:
for _, mapkey := range value.MapKeys() {
if key == mapkey.String() {
return value.MapIndex(mapkey).Elem().Interface()
}
}
return nil
case reflect.Slice:
return nil
// For pointer types we just unwrap and call again.
case reflect.Ptr:
rawValue := value.Elem()
if !rawValue.IsValid() {
return nil
}
return search(rawValue, key)
case reflect.Interface:
rawValue := value.Elem()
return search(rawValue, key)
case reflect.String:
return nil
default:
return nil
}
}
Here's my test code, unfortunately I can't share the test data. It's just a bunch of nested json objects.
package state
import (
"encoding/json"
"io/ioutil"
"reflect"
"testing"
)
const (
dataPath = "path/to/your/data.json"
)
var tests = []struct {
Input string
Result interface{}
}{
// your test fixtures here for table driven tests.
}
type mydata struct {
One map[string]interface{} `json:"1"`
Two map[string]interface{} `json:"2"`
Three map[string]interface{} `json:"3"`
}
func TestConcurrentSearch(t *testing.T) {
data := &mydata{
map[string]interface{}{},
map[string]interface{}{},
map[string]interface{}{},
}
bytes, err := ioutil.ReadFile(dataPath)
if err != nil {
t.Errorf("ioutil: problem loading test data, %v", err)
}
jerr := json.Unmarshal(bytes, data)
if jerr != nil {
t.Errorf("json: problem unmarshalling test data, %s", jerr)
}
// Setup finished
output := make(chan interface{})
for _, fixture := range tests {
go concurrentSearch(output, data, fixture.Input)
select {
case result := <-output:
if !reflect.DeepEqual(fixture.Result, result) {
t.Errorf("Expected: %v, Actual %v", fixture.Result, result)
}
}
}
}