I have data that I receive via http, this data needs to be processed by two different functions. It is important that they are processed by each function in sequence. In the file, for example: 1,2,3,4,5. And the database also recorded 1,2,3,4,5. As a fifo model. Now I have such a problem ... The data I have is running continuously and sometimes the database can fulfill my request to update the data quite a long time, because of this I can not update the file in a timely manner. It is important for me that the data has been added to the file or database when it is possible. I could use buffered channels, but I can not know how much data can wait for processing in the queue, I would not want to indicate the size of the buffer is certainly large. I tried adding more goroutine to the NewData function, but in that case my data is not written sequentially.
this code shows the problem.
package main
import (
"fmt"
"time"
)
type procHandler interface {
Start()
NewData(newdata []byte)
}
type fileWriter struct {
Data chan []byte
}
func (proc *fileWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *fileWriter) NewData(newdata []byte) {
proc.Data <- newdata
}
type sqlWriter struct {
Data chan []byte
}
func (proc *sqlWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data
time.Sleep(5 * time.Second)
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *sqlWriter) NewData(newdata []byte) {
proc.Data <- newdata
}
var processors = []procHandler{}
func receiver() {
newDataImitateByteRange := 30
for i := 0; i < newDataImitateByteRange; i++ {
pseudoData := []byte{byte(i)}
for _, handler := range processors {
handler.NewData(pseudoData)
}
}
}
func main() {
// file writer
fileUpdate := &fileWriter{}
processors = append(processors, fileUpdate)
// sql writer
sqlUpdate := &sqlWriter{}
processors = append(processors, sqlUpdate)
sqlUpdate.Start()
fileUpdate.Start()
go receiver()
fmt.Scanln()
}
Code works: https://play.golang.org/p/rSshsJYZ4h
output:
proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.sqlWriter [0] (sleep)
proc *main.fileWriter [2] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [1] (sleep)
proc *main.fileWriter [3] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [2]
proc *main.fileWriter [4]
proc *main.sqlWriter [3]
proc *main.fileWriter [5]
proc *main.sqlWriter [4]
proc *main.fileWriter [6]
I want:
proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.fileWriter [2]
proc *main.fileWriter [3]
proc *main.fileWriter [4]
proc *main.fileWriter [5]
proc *main.fileWriter [6]
proc *main.sqlWriter [0] (after 5 seconds passed the handler started execution.)
proc *main.sqlWriter [1] (sleep)
proc *main.sqlWriter [2] (sleep)
proc *main.sqlWriter [3] (sleep)
proc *main.sqlWriter [4] (sleep)
proc *main.sqlWriter [5] (sleep)
proc *main.sqlWriter [6] (sleep)
I hope for help, thank you!