duanmoen784988 2017-10-02 04:17
浏览 5

多个功能的数据处理是异步的

I have data that I receive via http, this data needs to be processed by two different functions. It is important that they are processed by each function in sequence. In the file, for example: 1,2,3,4,5. And the database also recorded 1,2,3,4,5. As a fifo model. Now I have such a problem ... The data I have is running continuously and sometimes the database can fulfill my request to update the data quite a long time, because of this I can not update the file in a timely manner. It is important for me that the data has been added to the file or database when it is possible. I could use buffered channels, but I can not know how much data can wait for processing in the queue, I would not want to indicate the size of the buffer is certainly large. I tried adding more goroutine to the NewData function, but in that case my data is not written sequentially.

this code shows the problem.

package main

import (
    "fmt"
    "time"
)

type procHandler interface {
    Start()
    NewData(newdata []byte)
}

type fileWriter struct {
    Data chan []byte
}

func (proc *fileWriter) Start() {
    proc.Data = make(chan []byte)
    go func() {
        for {
            obj := <-proc.Data

            fmt.Printf("proc %T ", proc)
            fmt.Println(obj)
        }
    }()
}

func (proc *fileWriter) NewData(newdata []byte) {
    proc.Data <- newdata
}

type sqlWriter struct {
    Data chan []byte
}

func (proc *sqlWriter) Start() {
    proc.Data = make(chan []byte)
    go func() {
        for {
            obj := <-proc.Data
            time.Sleep(5 * time.Second)
            fmt.Printf("proc %T ", proc)
            fmt.Println(obj)
        }
    }()
}

func (proc *sqlWriter) NewData(newdata []byte) {
    proc.Data <- newdata
}

var processors = []procHandler{}

func receiver() {
    newDataImitateByteRange := 30
    for i := 0; i < newDataImitateByteRange; i++ {
        pseudoData := []byte{byte(i)}

        for _, handler := range processors {
            handler.NewData(pseudoData)
        }
    }
}

func main() {
    // file writer
    fileUpdate := &fileWriter{}
    processors = append(processors, fileUpdate)

    // sql writer
    sqlUpdate := &sqlWriter{}
    processors = append(processors, sqlUpdate)

    sqlUpdate.Start()
    fileUpdate.Start()

    go receiver()

    fmt.Scanln()
}

Code works: https://play.golang.org/p/rSshsJYZ4h

output:

proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.sqlWriter [0] (sleep)
proc *main.fileWriter [2] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [1] (sleep)
proc *main.fileWriter [3] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [2]
proc *main.fileWriter [4]
proc *main.sqlWriter [3]
proc *main.fileWriter [5]
proc *main.sqlWriter [4]
proc *main.fileWriter [6]

I want:

proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.fileWriter [2]
proc *main.fileWriter [3]
proc *main.fileWriter [4]
proc *main.fileWriter [5]
proc *main.fileWriter [6]
proc *main.sqlWriter [0] (after 5 seconds passed the handler started execution.)
proc *main.sqlWriter [1] (sleep)
proc *main.sqlWriter [2] (sleep)
proc *main.sqlWriter [3] (sleep)
proc *main.sqlWriter [4] (sleep)
proc *main.sqlWriter [5] (sleep)
proc *main.sqlWriter [6] (sleep)

I hope for help, thank you!

  • 写回答

1条回答 默认 最新

  • duanbi3151 2017-10-07 00:24
    关注

    It sounds like what you are looking for is something that works like a channel that resizes (grows or shrinks) with the data that is enqueued on it. This could be implemented by having a queue between an input and output channel, with a goroutine to service those channels. Here is such a solution: https://github.com/gammazero/bigchan#bigchan

    I have used a BigChan as the Data channel in your fileWriter and sqlWriter and it appears to have the results you are looking for. Following is your reworked code:

    package main
    
    import (
        "fmt"
        "time"
    
        "github.com/gammazero/bigchan"
    )
    
    // Maximum number of items to buffer.  set to -1 for unlimited.
    const limit = 65536
    
    type procHandler interface {
        Start()
        NewData(newdata []byte)
    }
    
    type fileWriter struct {
        Data *bigchan.BigChan
    }
    
    func (proc *fileWriter) Start() {
        proc.Data = bigchan.New(limit)
        go func() {
            for {
                _obj := <-proc.Data.Out()
                obj := _obj.([]byte)
    
                fmt.Printf("proc %T ", proc)
                fmt.Println(obj)
            }
        }()
    }
    
    func (proc *fileWriter) NewData(newdata []byte) {
        proc.Data.In() <- newdata
    }
    
    type sqlWriter struct {
        Data *bigchan.BigChan
    }
    
    func (proc *sqlWriter) Start() {
        proc.Data = bigchan.New(limit)
    
        go func() {
            for {
                _obj := <-proc.Data.Out()
                obj := _obj.([]byte)
                time.Sleep(5 * time.Second)
                fmt.Printf("proc %T ", proc)
                fmt.Println(obj)
            }
        }()
    }
    func (proc *sqlWriter) NewData(newdata []byte) {
        proc.Data.In() <- newdata
    }
    
    var processors = []procHandler{}
    
    func receiver() {
        newDataImitateByteRange := 30
        for i := 0; i < newDataImitateByteRange; i++ {
            pseudoData := []byte{byte(i)}
    
            for _, handler := range processors {
                handler.NewData(pseudoData)
            }
        }
    }
    
    func main() {
        // file writer
        fileUpdate := &fileWriter{}
        processors = append(processors, fileUpdate)
    
        // sql writer
        sqlUpdate := &sqlWriter{}
        processors = append(processors, sqlUpdate)
    
        sqlUpdate.Start()
        fileUpdate.Start()
    
        go receiver()
    
        fmt.Scanln()
    }
    
    评论

报告相同问题?

悬赏问题

  • ¥15 用hfss做微带贴片阵列天线的时候分析设置有问题
  • ¥50 我撰写的python爬虫爬不了 要爬的网址有反爬机制
  • ¥15 Centos / PETSc / PETGEM
  • ¥15 centos7.9 IPv6端口telnet和端口监控问题
  • ¥120 计算机网络的新校区组网设计
  • ¥20 完全没有学习过GAN,看了CSDN的一篇文章,里面有代码但是完全不知道如何操作
  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 海浪数据 南海地区海况数据,波浪数据
  • ¥20 软件测试决策法疑问求解答
  • ¥15 win11 23H2删除推荐的项目,支持注册表等