dongying9712 2019-03-05 11:47
浏览 174
已采纳

并行执行DynamoDB查询(全局二级索引的BatchGetItems)

The idea here is run multiple DynamoDB queries in parallel as the query is run over a GSI. As of now BatchGetItems doesn't support querying over Indexes and the recommended approach is to Query the data in parallel. I'm using go routines with wg to take care of the executions of the routines in parallel.

The input to the function is an array of strings with an ID, the outputs are the attributes of the Ids.

When the function is run locally, there is no issue, however, when the function is run on AWS-Lambda, the returned data keeps growing;

ie; Input 2 items should output 2 items. If the function is tested on AWS-Lambda,

  • 1st time the function returns 2 item
  • 2nd time it returns 4 items (same items are repeated 2 times)
  • 3rd time it returns 6 items (same items are repeated 4 times)

and so on. Here is a snippet of the code. Is there something not handled correctly that's having the lambda output the extra set of data every-time the lambda is run?

package main

import (
    "context"
    "fmt"
    "os"
    "sync"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/dynamodb"
    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)

//Final Output Interface
var bulkOutput []interface{}

func exitWithError(err error) {
    fmt.Fprintln(os.Stderr, err)
    os.Exit(1)
}

//LambdaInputJSON input for the lambda handler
type LambdaInputJSON struct {
    Ids      []string `json:"ids,omitempty"`
}

//HandleRequest : Lambda entry point
func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
    return DynamoDBBatchGetRecords(data), nil
}

func main() {
    lambda.Start(HandleRequest)
}

func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {

    var wg sync.WaitGroup
    var mutex = &sync.Mutex{}

    iterations := len(a.Ids)
    wg.Add(iterations)
    for i := 0; i < iterations; i++ {
        go QueryOutput(a.Ids[i], &wg, mutex)
    }

    wg.Wait()
    return bulkOutput

}

//QueryOutput GoRoutine
func QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {
    var outputData []interface{}
    defer wg.Done()
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("aws-region"),
    })
    if err != nil {
        exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
    }
    ddb := dynamodb.New(sess)
    queryInput := &dynamodb.QueryInput{
        Limit:                aws.Int64(1),
        TableName:            aws.String("table-name"),
        IndexName:            aws.String("gsi-index"),
        ScanIndexForward:     aws.Bool(false),
        ConsistentRead:       aws.Bool(false),
        KeyConditions: map[string]*dynamodb.Condition{
            "column_name": {
                ComparisonOperator: aws.String("EQ"),
                AttributeValueList: []*dynamodb.AttributeValue{
                    {
                        S: aws.String(data),
                    },
                },
            },
        },
    }
    output, err := ddb.Query(queryInput)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to make Query API call, %v", err))
    }
    err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
    if err != nil {
        exitWithError(fmt.Errorf("Failed to unmarshal Query result items, %v", err))
    }
    mtx.Lock()
    bulkOutput = append(bulkOutput, outputData)
    mtx.Unlock()
}
  • 写回答

1条回答 默认 最新

  • doufen3786 2019-03-07 08:56
    关注

    According to documentation, global variables are independent of your Lambda function's handler code. This was causing the buffer to build up over time.

    Rectified reference pasted below.

    package main
    
    import (
        "context"
        "fmt"
        "os"
        "sync"
    
        "github.com/aws/aws-lambda-go/lambda"
        "github.com/aws/aws-sdk-go/aws"
        "github.com/aws/aws-sdk-go/aws/session"
        "github.com/aws/aws-sdk-go/service/dynamodb"
        "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
    )
    
    func exitWithError(err error) {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
    
    //HandleRequest : Lambda entry point
    func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {
        output := DynamoDBBatchGetRecords(data)
        return output, nil
    }
    
    func main() {
        lambda.Start(HandleRequest)
    }
    
    func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {
        var dataOut []interface{}
        var wg = &sync.WaitGroup{}
        var mtx = &sync.Mutex{}
    
        iterations := len(a.Ids)
        wg.Add(iterations)
        for i := 0; i < i; i++ {
            go func(i int) {
                defer wg.Done()
                var outputData []interface{}
                sess, err := session.NewSession(&aws.Config{
                    Region: aws.String("aws-region"),
                })
                if err != nil {
                    exitWithError(fmt.Errorf("failed to make Query API call, %v", err))
                }
                ddb := dynamodb.New(sess)
                queryInput := &dynamodb.QueryInput{
                    Limit:            aws.Int64(1),
                    TableName:        aws.String("table"),
                    IndexName:        aws.String("index"),
                    ScanIndexForward: aws.Bool(false),
                    ConsistentRead: aws.Bool(false),
                    KeyConditions: map[string]*dynamodb.Condition{
                        "index-column": {
                            ComparisonOperator: aws.String("EQ"),
                            AttributeValueList: []*dynamodb.AttributeValue{
                                {
                                    S: aws.String(a.Ids[i]),
                                },
                            },
                        },
                    },
                }
                output, err := ddb.Query(queryInput)
    
                if err != nil {
                    exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))
                }
                err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)
                if err != nil {
                    exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))
                }
    
                mtx.Lock()
                dataOut = append(dataOut, outputData[0])
                mtx.Unlock()
    
            }(i)
        }
        wg.Wait()
        return dataOut
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 乘性高斯噪声在深度学习网络中的应用
  • ¥15 运筹学排序问题中的在线排序
  • ¥15 关于docker部署flink集成hadoop的yarn,请教个问题 flink启动yarn-session.sh连不上hadoop,这个整了好几天一直不行,求帮忙看一下怎么解决
  • ¥30 求一段fortran代码用IVF编译运行的结果
  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥30 python代码,帮调试,帮帮忙吧