I'm trying to build a system using the fan-out / fan-in design to call multiple rest API.
My fan out would receive a list of users. Foreach user, execute an other time the fan-out to create multiple http get and then aggregate the data in that user.
This is my fan out method.
in <- chan struct{} is my channel with the user information
f func(struct{}) struct{} is the method that will take a user as a parameter, call an other instance of my fan out, merge the data in the user and then return the aggregated user
<- chan struct{} is the new channel with the result
func split(in <- chan struct{}, f func(struct{}) struct{}) <- chan struct{} {
out := make(chan struct{})
go func() {
for data := range in {
out <- f(data)
}
close(out)
}()
return out
}
The merge method is used to take multiple channel and merge them to a single channel
func merge(in ...<- chan struct{}) <- chan struct{} {
out := make(chan struct{})
var wg sync.WaitGroup
output := func(channel <- chan struct{}) () {
for data := range channel {
out <- data
}
wg.Done()
}
for _, channel := range in {
go output(channel)
}
go func() {
wg.Wait()
close(out)
}()
wg.Add(len(in))
return out
}
Process method used to create a fan-out / fan-in system
func Process(dataSlice []struct{}, f func(struct{}) struct{}) <-chan struct{} {
in := groupData(dataSlice) // this method is simply taking a slice of struct and returning a channel of struct with the values inside
out1 := split(in, f)
out2 := split(in, f)
out3 := split(in, f)
return merge(out1, out2, out3)
}
Finally the method calling this thing out
type Request struct {
Name string
}
type Response struct {
Name string
Age int
}
func main() {
users := make([]Request, 0)
users = append(users, Request{Name: "A"})
users = append(users, Request{Name: "B"})
users = append(users, Request{Name: "C"})
users = append(users, Request{Name: "D"})
for data := range parallel.Process(users, apply) {
fmt.Println(data)
}
}
func apply(request Request) Response {
return Response{request.Name, 1}
}
So my apply method is really small here. In reality i am calling the process method again to split my rest api call in multiple channels and retrieve all of the information for that request. My response struct would depend on the api i called.
I'm getting the following errors.
- cannot use users (type []Request) as type []struct {} in argument to parallel.Process
- cannot use apply (type func(Request) struct {}) as type parallel.ProcessData in argument to parallel.Process
- cannot use Response literal (type Response) as type struct {} in return argument
So basically i can't make a method that accepts any struct or some sort of interface of a request.
Thanks :)
Update:
Using interface{} and the reflect lib i managed to get this piece of code working.
func apply(request interface{}) interface{} {
if(reflect.TypeOf(request).Name() != reflect.TypeOf(Request{}).Name()) {
panic("invalid request type")
}
value := reflect.ValueOf(request)
typedRequest := value.Interface().(Request)
return Response{Name: typedRequest.Name, Age: rand.Intn(100)}
}
This feels wrong.
Since there is no generic type, are we supposed to re implement the same method multiple time for each type we want to use? What would be the suggested approach to solve in a proper way?